diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 134fcc0b8fa..276aadf5127 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -1024,7 +1024,9 @@ func (g *genericScheduler) selectNodesForPreemption( if meta != nil { metaCopy = meta.ShallowCopy() } - pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs) + pluginContextClone := pluginContext.Clone() + pods, numPDBViolations, fits := g.selectVictimsOnNode( + pluginContextClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs) if fits { resultLock.Lock() victims := schedulerapi.Victims{ @@ -1117,6 +1119,10 @@ func (g *genericScheduler) selectVictimsOnNode( return err } } + status := g.framework.RunPreFilterUpdaterRemovePod(pluginContext, pod, rp, nodeInfoCopy) + if !status.IsSuccess() { + return status.AsError() + } return nil } addPod := func(ap *v1.Pod) error { @@ -1126,6 +1132,10 @@ func (g *genericScheduler) selectVictimsOnNode( return err } } + status := g.framework.RunPreFilterUpdaterAddPod(pluginContext, pod, ap, nodeInfoCopy) + if !status.IsSuccess() { + return status.AsError() + } return nil } // As the first step, remove all the lower priority pods from the node and diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 4a3e532426a..71f141e6141 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -52,6 +52,7 @@ go_test( deps = [ "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config/scheme:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/pkg/scheduler/framework/v1alpha1/context.go b/pkg/scheduler/framework/v1alpha1/context.go index 80667dd70cc..ade1661d5d2 100644 --- a/pkg/scheduler/framework/v1alpha1/context.go +++ b/pkg/scheduler/framework/v1alpha1/context.go @@ -53,8 +53,12 @@ func NewPluginContext() *PluginContext { } } -// Clone creates a copy of PluginContext and returns its pointer. +// Clone creates a copy of PluginContext and returns its pointer. Clone returns +// nil if the context being cloned is nil. func (c *PluginContext) Clone() *PluginContext { + if c == nil { + return nil + } copy := NewPluginContext() for k, v := range c.storage { copy.Write(k, v.Clone()) diff --git a/pkg/scheduler/framework/v1alpha1/context_test.go b/pkg/scheduler/framework/v1alpha1/context_test.go index 0ddac49ab81..ed4139f5ec7 100644 --- a/pkg/scheduler/framework/v1alpha1/context_test.go +++ b/pkg/scheduler/framework/v1alpha1/context_test.go @@ -64,3 +64,11 @@ func TestPluginContextClone(t *testing.T) { t.Errorf("cloned copy should not change, got %q, expected %q", v.data, data1) } } + +func TestPluginContextCloneNil(t *testing.T) { + var pc *PluginContext + pcCopy := pc.Clone() + if pcCopy != nil { + t.Errorf("clone expected to be nil") + } +} diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 641f27d46ae..27f206ed96d 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -306,6 +306,46 @@ func (f *framework) RunPreFilterPlugins( return nil } +// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured +// PreFilter plugins. It returns directly if any of the plugins return any +// status other than Success. +func (f *framework) RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod, + podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + for _, pl := range f.preFilterPlugins { + if updater := pl.Updater(); updater != nil { + status := updater.AddPod(pc, podToSchedule, podToAdd, nodeInfo) + if !status.IsSuccess() { + msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v", + pl.Name(), podToSchedule.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + } + + return nil +} + +// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured +// PreFilter plugins. It returns directly if any of the plugins return any +// status other than Success. +func (f *framework) RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod, + podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + for _, pl := range f.preFilterPlugins { + if updater := pl.Updater(); updater != nil { + status := updater.RemovePod(pc, podToSchedule, podToRemove, nodeInfo) + if !status.IsSuccess() { + msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v", + pl.Name(), podToSchedule.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + } + + return nil +} + // RunFilterPlugins runs the set of configured Filter plugins for pod on // the given node. If any of these plugins doesn't return "Success", the // given node is not suitable for running pod. diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index df5cd1ab8f8..1c8dba9a714 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -25,13 +25,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/apis/config" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) const ( - scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1" - scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2" - scorePlugin1 = "score-plugin-1" - pluginNotImplementingScore = "plugin-not-implementing-score" + scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1" + scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2" + scorePlugin1 = "score-plugin-1" + pluginNotImplementingScore = "plugin-not-implementing-score" + preFilterPluginName = "prefilter-plugin" + preFilterWithUpdaterPluginName = "prefilter-with-updater-plugin" ) // TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface. @@ -105,6 +108,56 @@ func (pl *PluginNotImplementingScore) Name() string { return pluginNotImplementingScore } +// TestPreFilterPlugin only implements PreFilterPlugin interface. +type TestPreFilterPlugin struct { + PreFilterCalled int +} + +func (pl *TestPreFilterPlugin) Name() string { + return preFilterPluginName +} + +func (pl *TestPreFilterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status { + pl.PreFilterCalled++ + return nil +} + +func (pl *TestPreFilterPlugin) Updater() Updater { + return nil +} + +// TestPreFilterWithUpdatePlugin implements Add/Remove interfaces. +type TestPreFilterWithUpdaterPlugin struct { + PreFilterCalled int + AddCalled int + RemoveCalled int +} + +func (pl *TestPreFilterWithUpdaterPlugin) Name() string { + return preFilterWithUpdaterPluginName +} + +func (pl *TestPreFilterWithUpdaterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status { + pl.PreFilterCalled++ + return nil +} + +func (pl *TestPreFilterWithUpdaterPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod, + podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + pl.AddCalled++ + return nil +} + +func (pl *TestPreFilterWithUpdaterPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod, + podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + pl.RemoveCalled++ + return nil +} + +func (pl *TestPreFilterWithUpdaterPlugin) Updater() Updater { + return pl +} + var registry Registry = func() Registry { r := make(Registry) r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1) @@ -365,6 +418,44 @@ func TestRunScorePlugins(t *testing.T) { } } +func TestPreFilterPlugins(t *testing.T) { + preFilter1 := &TestPreFilterPlugin{} + preFilter2 := &TestPreFilterWithUpdaterPlugin{} + r := make(Registry) + r.Register(preFilterPluginName, + func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { + return preFilter1, nil + }) + r.Register(preFilterWithUpdaterPluginName, + func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { + return preFilter2, nil + }) + plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithUpdaterPluginName}, {Name: preFilterPluginName}}}} + t.Run("TestPreFilterPlugin", func(t *testing.T) { + f, err := NewFramework(r, plugins, emptyArgs) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + f.RunPreFilterPlugins(nil, nil) + f.RunPreFilterUpdaterAddPod(nil, nil, nil, nil) + f.RunPreFilterUpdaterRemovePod(nil, nil, nil, nil) + + if preFilter1.PreFilterCalled != 1 { + t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled) + } + if preFilter2.PreFilterCalled != 1 { + t.Errorf("preFilter2 called %v, expected: 1", preFilter2.PreFilterCalled) + } + if preFilter2.AddCalled != 1 { + t.Errorf("AddPod called %v, expected: 1", preFilter2.AddCalled) + } + if preFilter2.RemoveCalled != 1 { + t.Errorf("AddPod called %v, expected: 1", preFilter2.RemoveCalled) + } + }) + +} + func buildConfigDefaultWeights(ps ...string) *config.Plugins { return buildConfigWithWeights(defaultWeights, ps...) } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 25490ad32e5..99f865a8cb9 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -165,6 +165,18 @@ type QueueSortPlugin interface { Less(*PodInfo, *PodInfo) bool } +// Updater is an interface that is included in plugins that allow specifying +// callbacks to make incremental updates to its supposedly pre-calculated +// state. +type Updater interface { + // AddPod is called by the framework while trying to evaluate the impact + // of adding podToAdd to the node while scheduling podToSchedule. + AddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + // RemovePod is called by the framework while trying to evaluate the impact + // of removing podToRemove from the node while scheduling podToSchedule. + RemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status +} + // PreFilterPlugin is an interface that must be implemented by "prefilter" plugins. // These plugins are called at the beginning of the scheduling cycle. type PreFilterPlugin interface { @@ -172,6 +184,13 @@ type PreFilterPlugin interface { // PreFilter is called at the beginning of the scheduling cycle. All PreFilter // plugins must return success or the pod will be rejected. PreFilter(pc *PluginContext, p *v1.Pod) *Status + // Updater returns an updater if the plugin implements one, or nil if it + // does not. A Pre-filter plugin can provide an updater to incrementally + // modify its pre-processed info. The framework guarantees that the updater + // AddPod/RemovePod functions will only be called after PreFilter, + // possibly on a cloned PluginContext, and may call those functions more than + // once before calling Filter again on a specific node. + Updater() Updater } // FilterPlugin is an interface for Filter plugins. These plugins are called at the @@ -326,6 +345,16 @@ type Framework interface { // schedule the target pod. RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + // RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured + // PreFilter plugins. It returns directly if any of the plugins return any + // status other than Success. + RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + + // RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured + // PreFilter plugins. It returns directly if any of the plugins return any + // status other than Success. + RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + // RunPostFilterPlugins runs the set of configured post-filter plugins. If any // of these plugins returns any status other than "Success", the given node is // rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 1e9c99bf319..1fc80e5dc7a 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -175,6 +175,14 @@ func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod, return nil } +func (*fakeFramework) RunPreFilterUpdaterAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + return nil +} + +func (*fakeFramework) RunPreFilterUpdaterRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + return nil +} + func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) { return nil, nil } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 93e43eb9aa1..a05bbb40e33 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -326,6 +326,11 @@ func (pp *PreFilterPlugin) Name() string { return prefilterPluginName } +// Updater returns the updater interface. +func (pp *PreFilterPlugin) Updater() framework.Updater { + return nil +} + // PreFilter is a test function that returns (true, nil) or errors for testing. func (pp *PreFilterPlugin) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status { pp.numPreFilterCalled++ diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index a07c41ab053..5ac8502668b 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -27,6 +27,7 @@ import ( policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -38,6 +39,9 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/scheduling" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/plugin/pkg/admission/priority" testutils "k8s.io/kubernetes/test/utils" @@ -66,10 +70,80 @@ func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error { return waitForNominatedNodeNameWithTimeout(cs, pod, wait.ForeverTestTimeout) } +const tokenFilterName = "token-filter" + +type tokenFilter struct { + Tokens int + Unresolvable bool +} + +// Name returns name of the plugin. +func (fp *tokenFilter) Name() string { + return tokenFilterName +} + +func (fp *tokenFilter) Filter(pc *framework.PluginContext, pod *v1.Pod, + nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + if fp.Tokens > 0 { + fp.Tokens-- + return nil + } + status := framework.Unschedulable + if fp.Unresolvable { + status = framework.UnschedulableAndUnresolvable + } + return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name)) +} + +func (fp *tokenFilter) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status { + return nil +} + +func (fp *tokenFilter) AddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, + podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + fp.Tokens-- + return nil +} + +func (fp *tokenFilter) RemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, + podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + fp.Tokens++ + return nil +} + +func (fp *tokenFilter) Updater() framework.Updater { + return fp +} + +var _ = framework.FilterPlugin(&tokenFilter{}) + // TestPreemption tests a few preemption scenarios. func TestPreemption(t *testing.T) { - // Initialize scheduler. - context := initTest(t, "preemption") + // Initialize scheduler with a filter plugin. + var filter tokenFilter + registry := framework.Registry{filterPluginName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) { + return &filter, nil + }} + plugin := &schedulerconfig.Plugins{ + Filter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: filterPluginName, + }, + }, + }, + PreFilter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: filterPluginName, + }, + }, + }, + } + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "preemptiom", nil), + false, nil, registry, plugin, []schedulerconfig.PluginConfig{}, time.Second) + defer cleanupTest(t, context) cs := context.clientSet @@ -78,14 +152,18 @@ func TestPreemption(t *testing.T) { v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } + maxTokens := 1000 tests := []struct { description string existingPods []*v1.Pod pod *v1.Pod + initTokens int + unresolvable bool preemptedPodIndexes map[int]struct{} }{ { description: "basic pod preemption", + initTokens: maxTokens, existingPods: []*v1.Pod{ initPausePod(context.clientSet, &pausePodConfig{ Name: "victim-pod", @@ -108,8 +186,61 @@ func TestPreemption(t *testing.T) { }), preemptedPodIndexes: map[int]struct{}{0: {}}, }, + { + description: "basic pod preemption with filter", + initTokens: 1, + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "victim-pod", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + }, + { + // same as the previous test, but the filter is unresolvable. + description: "basic pod preemption with unresolvable filter", + initTokens: 1, + unresolvable: true, + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "victim-pod", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{}, + }, { description: "preemption is performed to satisfy anti-affinity", + initTokens: maxTokens, existingPods: []*v1.Pod{ initPausePod(cs, &pausePodConfig{ Name: "pod-0", Namespace: context.ns.Name, @@ -173,6 +304,7 @@ func TestPreemption(t *testing.T) { { // This is similar to the previous case only pod-1 is high priority. description: "preemption is not performed when anti-affinity is not satisfied", + initTokens: maxTokens, existingPods: []*v1.Pod{ initPausePod(cs, &pausePodConfig{ Name: "pod-0", Namespace: context.ns.Name, @@ -254,6 +386,8 @@ func TestPreemption(t *testing.T) { } for _, test := range tests { + filter.Tokens = test.initTokens + filter.Unresolvable = test.unresolvable pods := make([]*v1.Pod, len(test.existingPods)) // Create and run existingPods. for i, p := range test.existingPods {