From 37b9e6d1eadbf801b3dae4bdf7738b5eae39bf44 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Tue, 24 Sep 2019 12:39:27 -0400 Subject: [PATCH] An interface that allows pre-filter plugins to update their pre-calculated. This is needed to allow efficient preemption simulations: during preemption, we remove/add pods from each node before running the filter plugins again to evaluate whether removing/adding specific pods will allow the incoming pod to be scheduled on the node. Instead of calling prefilter again, we should allow the plugin to do incremental update to its pre-computed state. --- pkg/scheduler/core/generic_scheduler.go | 12 +- pkg/scheduler/framework/v1alpha1/BUILD | 1 + pkg/scheduler/framework/v1alpha1/context.go | 6 +- .../framework/v1alpha1/context_test.go | 8 + pkg/scheduler/framework/v1alpha1/framework.go | 40 +++++ .../framework/v1alpha1/framework_test.go | 99 ++++++++++++- pkg/scheduler/framework/v1alpha1/interface.go | 29 ++++ .../internal/queue/scheduling_queue_test.go | 8 + test/integration/scheduler/framework_test.go | 5 + test/integration/scheduler/preemption_test.go | 138 +++++++++++++++++- 10 files changed, 338 insertions(+), 8 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index dd5b8fb423b..7617c122b34 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -1023,7 +1023,9 @@ func (g *genericScheduler) selectNodesForPreemption( if meta != nil { metaCopy = meta.ShallowCopy() } - pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs) + pluginContextClone := pluginContext.Clone() + pods, numPDBViolations, fits := g.selectVictimsOnNode( + pluginContextClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs) if fits { resultLock.Lock() victims := schedulerapi.Victims{ @@ -1116,6 +1118,10 @@ func (g *genericScheduler) selectVictimsOnNode( return err } } + status := g.framework.RunPreFilterUpdaterRemovePod(pluginContext, pod, rp, nodeInfoCopy) + if !status.IsSuccess() { + return status.AsError() + } return nil } addPod := func(ap *v1.Pod) error { @@ -1125,6 +1131,10 @@ func (g *genericScheduler) selectVictimsOnNode( return err } } + status := g.framework.RunPreFilterUpdaterAddPod(pluginContext, pod, ap, nodeInfoCopy) + if !status.IsSuccess() { + return status.AsError() + } return nil } // As the first step, remove all the lower priority pods from the node and diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 4a3e532426a..71f141e6141 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -52,6 +52,7 @@ go_test( deps = [ "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config/scheme:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/pkg/scheduler/framework/v1alpha1/context.go b/pkg/scheduler/framework/v1alpha1/context.go index 80667dd70cc..ade1661d5d2 100644 --- a/pkg/scheduler/framework/v1alpha1/context.go +++ b/pkg/scheduler/framework/v1alpha1/context.go @@ -53,8 +53,12 @@ func NewPluginContext() *PluginContext { } } -// Clone creates a copy of PluginContext and returns its pointer. +// Clone creates a copy of PluginContext and returns its pointer. Clone returns +// nil if the context being cloned is nil. func (c *PluginContext) Clone() *PluginContext { + if c == nil { + return nil + } copy := NewPluginContext() for k, v := range c.storage { copy.Write(k, v.Clone()) diff --git a/pkg/scheduler/framework/v1alpha1/context_test.go b/pkg/scheduler/framework/v1alpha1/context_test.go index 0ddac49ab81..ed4139f5ec7 100644 --- a/pkg/scheduler/framework/v1alpha1/context_test.go +++ b/pkg/scheduler/framework/v1alpha1/context_test.go @@ -64,3 +64,11 @@ func TestPluginContextClone(t *testing.T) { t.Errorf("cloned copy should not change, got %q, expected %q", v.data, data1) } } + +func TestPluginContextCloneNil(t *testing.T) { + var pc *PluginContext + pcCopy := pc.Clone() + if pcCopy != nil { + t.Errorf("clone expected to be nil") + } +} diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 641f27d46ae..27f206ed96d 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -306,6 +306,46 @@ func (f *framework) RunPreFilterPlugins( return nil } +// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured +// PreFilter plugins. It returns directly if any of the plugins return any +// status other than Success. +func (f *framework) RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod, + podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + for _, pl := range f.preFilterPlugins { + if updater := pl.Updater(); updater != nil { + status := updater.AddPod(pc, podToSchedule, podToAdd, nodeInfo) + if !status.IsSuccess() { + msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v", + pl.Name(), podToSchedule.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + } + + return nil +} + +// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured +// PreFilter plugins. It returns directly if any of the plugins return any +// status other than Success. +func (f *framework) RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod, + podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + for _, pl := range f.preFilterPlugins { + if updater := pl.Updater(); updater != nil { + status := updater.RemovePod(pc, podToSchedule, podToRemove, nodeInfo) + if !status.IsSuccess() { + msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v", + pl.Name(), podToSchedule.Name, status.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + } + } + + return nil +} + // RunFilterPlugins runs the set of configured Filter plugins for pod on // the given node. If any of these plugins doesn't return "Success", the // given node is not suitable for running pod. diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index df5cd1ab8f8..1c8dba9a714 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -25,13 +25,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/apis/config" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) const ( - scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1" - scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2" - scorePlugin1 = "score-plugin-1" - pluginNotImplementingScore = "plugin-not-implementing-score" + scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1" + scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2" + scorePlugin1 = "score-plugin-1" + pluginNotImplementingScore = "plugin-not-implementing-score" + preFilterPluginName = "prefilter-plugin" + preFilterWithUpdaterPluginName = "prefilter-with-updater-plugin" ) // TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface. @@ -105,6 +108,56 @@ func (pl *PluginNotImplementingScore) Name() string { return pluginNotImplementingScore } +// TestPreFilterPlugin only implements PreFilterPlugin interface. +type TestPreFilterPlugin struct { + PreFilterCalled int +} + +func (pl *TestPreFilterPlugin) Name() string { + return preFilterPluginName +} + +func (pl *TestPreFilterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status { + pl.PreFilterCalled++ + return nil +} + +func (pl *TestPreFilterPlugin) Updater() Updater { + return nil +} + +// TestPreFilterWithUpdatePlugin implements Add/Remove interfaces. +type TestPreFilterWithUpdaterPlugin struct { + PreFilterCalled int + AddCalled int + RemoveCalled int +} + +func (pl *TestPreFilterWithUpdaterPlugin) Name() string { + return preFilterWithUpdaterPluginName +} + +func (pl *TestPreFilterWithUpdaterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status { + pl.PreFilterCalled++ + return nil +} + +func (pl *TestPreFilterWithUpdaterPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod, + podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + pl.AddCalled++ + return nil +} + +func (pl *TestPreFilterWithUpdaterPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod, + podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status { + pl.RemoveCalled++ + return nil +} + +func (pl *TestPreFilterWithUpdaterPlugin) Updater() Updater { + return pl +} + var registry Registry = func() Registry { r := make(Registry) r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1) @@ -365,6 +418,44 @@ func TestRunScorePlugins(t *testing.T) { } } +func TestPreFilterPlugins(t *testing.T) { + preFilter1 := &TestPreFilterPlugin{} + preFilter2 := &TestPreFilterWithUpdaterPlugin{} + r := make(Registry) + r.Register(preFilterPluginName, + func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { + return preFilter1, nil + }) + r.Register(preFilterWithUpdaterPluginName, + func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { + return preFilter2, nil + }) + plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithUpdaterPluginName}, {Name: preFilterPluginName}}}} + t.Run("TestPreFilterPlugin", func(t *testing.T) { + f, err := NewFramework(r, plugins, emptyArgs) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + f.RunPreFilterPlugins(nil, nil) + f.RunPreFilterUpdaterAddPod(nil, nil, nil, nil) + f.RunPreFilterUpdaterRemovePod(nil, nil, nil, nil) + + if preFilter1.PreFilterCalled != 1 { + t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled) + } + if preFilter2.PreFilterCalled != 1 { + t.Errorf("preFilter2 called %v, expected: 1", preFilter2.PreFilterCalled) + } + if preFilter2.AddCalled != 1 { + t.Errorf("AddPod called %v, expected: 1", preFilter2.AddCalled) + } + if preFilter2.RemoveCalled != 1 { + t.Errorf("AddPod called %v, expected: 1", preFilter2.RemoveCalled) + } + }) + +} + func buildConfigDefaultWeights(ps ...string) *config.Plugins { return buildConfigWithWeights(defaultWeights, ps...) } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index 25490ad32e5..99f865a8cb9 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -165,6 +165,18 @@ type QueueSortPlugin interface { Less(*PodInfo, *PodInfo) bool } +// Updater is an interface that is included in plugins that allow specifying +// callbacks to make incremental updates to its supposedly pre-calculated +// state. +type Updater interface { + // AddPod is called by the framework while trying to evaluate the impact + // of adding podToAdd to the node while scheduling podToSchedule. + AddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + // RemovePod is called by the framework while trying to evaluate the impact + // of removing podToRemove from the node while scheduling podToSchedule. + RemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status +} + // PreFilterPlugin is an interface that must be implemented by "prefilter" plugins. // These plugins are called at the beginning of the scheduling cycle. type PreFilterPlugin interface { @@ -172,6 +184,13 @@ type PreFilterPlugin interface { // PreFilter is called at the beginning of the scheduling cycle. All PreFilter // plugins must return success or the pod will be rejected. PreFilter(pc *PluginContext, p *v1.Pod) *Status + // Updater returns an updater if the plugin implements one, or nil if it + // does not. A Pre-filter plugin can provide an updater to incrementally + // modify its pre-processed info. The framework guarantees that the updater + // AddPod/RemovePod functions will only be called after PreFilter, + // possibly on a cloned PluginContext, and may call those functions more than + // once before calling Filter again on a specific node. + Updater() Updater } // FilterPlugin is an interface for Filter plugins. These plugins are called at the @@ -326,6 +345,16 @@ type Framework interface { // schedule the target pod. RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + // RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured + // PreFilter plugins. It returns directly if any of the plugins return any + // status other than Success. + RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + + // RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured + // PreFilter plugins. It returns directly if any of the plugins return any + // status other than Success. + RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status + // RunPostFilterPlugins runs the set of configured post-filter plugins. If any // of these plugins returns any status other than "Success", the given node is // rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses. diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index c1a79712a76..6334668119c 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -175,6 +175,14 @@ func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod, return nil } +func (*fakeFramework) RunPreFilterUpdaterAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + return nil +} + +func (*fakeFramework) RunPreFilterUpdaterRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + return nil +} + func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) { return nil, nil } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 93e43eb9aa1..a05bbb40e33 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -326,6 +326,11 @@ func (pp *PreFilterPlugin) Name() string { return prefilterPluginName } +// Updater returns the updater interface. +func (pp *PreFilterPlugin) Updater() framework.Updater { + return nil +} + // PreFilter is a test function that returns (true, nil) or errors for testing. func (pp *PreFilterPlugin) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status { pp.numPreFilterCalled++ diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index a07c41ab053..5ac8502668b 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -27,6 +27,7 @@ import ( policy "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -38,6 +39,9 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/scheduling" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" + schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/plugin/pkg/admission/priority" testutils "k8s.io/kubernetes/test/utils" @@ -66,10 +70,80 @@ func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error { return waitForNominatedNodeNameWithTimeout(cs, pod, wait.ForeverTestTimeout) } +const tokenFilterName = "token-filter" + +type tokenFilter struct { + Tokens int + Unresolvable bool +} + +// Name returns name of the plugin. +func (fp *tokenFilter) Name() string { + return tokenFilterName +} + +func (fp *tokenFilter) Filter(pc *framework.PluginContext, pod *v1.Pod, + nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + if fp.Tokens > 0 { + fp.Tokens-- + return nil + } + status := framework.Unschedulable + if fp.Unresolvable { + status = framework.UnschedulableAndUnresolvable + } + return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name)) +} + +func (fp *tokenFilter) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status { + return nil +} + +func (fp *tokenFilter) AddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, + podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + fp.Tokens-- + return nil +} + +func (fp *tokenFilter) RemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, + podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status { + fp.Tokens++ + return nil +} + +func (fp *tokenFilter) Updater() framework.Updater { + return fp +} + +var _ = framework.FilterPlugin(&tokenFilter{}) + // TestPreemption tests a few preemption scenarios. func TestPreemption(t *testing.T) { - // Initialize scheduler. - context := initTest(t, "preemption") + // Initialize scheduler with a filter plugin. + var filter tokenFilter + registry := framework.Registry{filterPluginName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) { + return &filter, nil + }} + plugin := &schedulerconfig.Plugins{ + Filter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: filterPluginName, + }, + }, + }, + PreFilter: &schedulerconfig.PluginSet{ + Enabled: []schedulerconfig.Plugin{ + { + Name: filterPluginName, + }, + }, + }, + } + context := initTestSchedulerWithOptions(t, + initTestMaster(t, "preemptiom", nil), + false, nil, registry, plugin, []schedulerconfig.PluginConfig{}, time.Second) + defer cleanupTest(t, context) cs := context.clientSet @@ -78,14 +152,18 @@ func TestPreemption(t *testing.T) { v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } + maxTokens := 1000 tests := []struct { description string existingPods []*v1.Pod pod *v1.Pod + initTokens int + unresolvable bool preemptedPodIndexes map[int]struct{} }{ { description: "basic pod preemption", + initTokens: maxTokens, existingPods: []*v1.Pod{ initPausePod(context.clientSet, &pausePodConfig{ Name: "victim-pod", @@ -108,8 +186,61 @@ func TestPreemption(t *testing.T) { }), preemptedPodIndexes: map[int]struct{}{0: {}}, }, + { + description: "basic pod preemption with filter", + initTokens: 1, + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "victim-pod", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + }, + { + // same as the previous test, but the filter is unresolvable. + description: "basic pod preemption with unresolvable filter", + initTokens: 1, + unresolvable: true, + existingPods: []*v1.Pod{ + initPausePod(context.clientSet, &pausePodConfig{ + Name: "victim-pod", + Namespace: context.ns.Name, + Priority: &lowPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + }, + pod: initPausePod(cs, &pausePodConfig{ + Name: "preemptor-pod", + Namespace: context.ns.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{}, + }, { description: "preemption is performed to satisfy anti-affinity", + initTokens: maxTokens, existingPods: []*v1.Pod{ initPausePod(cs, &pausePodConfig{ Name: "pod-0", Namespace: context.ns.Name, @@ -173,6 +304,7 @@ func TestPreemption(t *testing.T) { { // This is similar to the previous case only pod-1 is high priority. description: "preemption is not performed when anti-affinity is not satisfied", + initTokens: maxTokens, existingPods: []*v1.Pod{ initPausePod(cs, &pausePodConfig{ Name: "pod-0", Namespace: context.ns.Name, @@ -254,6 +386,8 @@ func TestPreemption(t *testing.T) { } for _, test := range tests { + filter.Tokens = test.initTokens + filter.Unresolvable = test.unresolvable pods := make([]*v1.Pod, len(test.existingPods)) // Create and run existingPods. for i, p := range test.existingPods {