An interface that allows pre-filter plugins to update their pre-calculated.

This is needed to allow efficient preemption simulations: during preemption, we remove/add pods from each node before running the filter plugins again to evaluate whether removing/adding specific pods will allow the incoming pod to be scheduled on the node. Instead of calling prefilter again, we should allow the plugin to do incremental update to its pre-computed state.
This commit is contained in:
Abdullah Gharaibeh
2019-09-24 12:39:27 -04:00
parent 512eccac1f
commit 37b9e6d1ea
10 changed files with 338 additions and 8 deletions

View File

@@ -1023,7 +1023,9 @@ func (g *genericScheduler) selectNodesForPreemption(
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
pluginContextClone := pluginContext.Clone()
pods, numPDBViolations, fits := g.selectVictimsOnNode(
pluginContextClone, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
@@ -1116,6 +1118,10 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterUpdaterRemovePod(pluginContext, pod, rp, nodeInfoCopy)
if !status.IsSuccess() {
return status.AsError()
}
return nil
}
addPod := func(ap *v1.Pod) error {
@@ -1125,6 +1131,10 @@ func (g *genericScheduler) selectVictimsOnNode(
return err
}
}
status := g.framework.RunPreFilterUpdaterAddPod(pluginContext, pod, ap, nodeInfoCopy)
if !status.IsSuccess() {
return status.AsError()
}
return nil
}
// As the first step, remove all the lower priority pods from the node and

View File

@@ -52,6 +52,7 @@ go_test(
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@@ -53,8 +53,12 @@ func NewPluginContext() *PluginContext {
}
}
// Clone creates a copy of PluginContext and returns its pointer.
// Clone creates a copy of PluginContext and returns its pointer. Clone returns
// nil if the context being cloned is nil.
func (c *PluginContext) Clone() *PluginContext {
if c == nil {
return nil
}
copy := NewPluginContext()
for k, v := range c.storage {
copy.Write(k, v.Clone())

View File

@@ -64,3 +64,11 @@ func TestPluginContextClone(t *testing.T) {
t.Errorf("cloned copy should not change, got %q, expected %q", v.data, data1)
}
}
func TestPluginContextCloneNil(t *testing.T) {
var pc *PluginContext
pcCopy := pc.Clone()
if pcCopy != nil {
t.Errorf("clone expected to be nil")
}
}

View File

@@ -306,6 +306,46 @@ func (f *framework) RunPreFilterPlugins(
return nil
}
// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if updater := pl.Updater(); updater != nil {
status := updater.AddPod(pc, podToSchedule, podToAdd, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}
return nil
}
// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
func (f *framework) RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
for _, pl := range f.preFilterPlugins {
if updater := pl.Updater(); updater != nil {
status := updater.RemovePod(pc, podToSchedule, podToRemove, nodeInfo)
if !status.IsSuccess() {
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
pl.Name(), podToSchedule.Name, status.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
}
return nil
}
// RunFilterPlugins runs the set of configured Filter plugins for pod on
// the given node. If any of these plugins doesn't return "Success", the
// given node is not suitable for running pod.

View File

@@ -25,13 +25,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
const (
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
pluginNotImplementingScore = "plugin-not-implementing-score"
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
scorePlugin1 = "score-plugin-1"
pluginNotImplementingScore = "plugin-not-implementing-score"
preFilterPluginName = "prefilter-plugin"
preFilterWithUpdaterPluginName = "prefilter-with-updater-plugin"
)
// TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface.
@@ -105,6 +108,56 @@ func (pl *PluginNotImplementingScore) Name() string {
return pluginNotImplementingScore
}
// TestPreFilterPlugin only implements PreFilterPlugin interface.
type TestPreFilterPlugin struct {
PreFilterCalled int
}
func (pl *TestPreFilterPlugin) Name() string {
return preFilterPluginName
}
func (pl *TestPreFilterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
pl.PreFilterCalled++
return nil
}
func (pl *TestPreFilterPlugin) Updater() Updater {
return nil
}
// TestPreFilterWithUpdatePlugin implements Add/Remove interfaces.
type TestPreFilterWithUpdaterPlugin struct {
PreFilterCalled int
AddCalled int
RemoveCalled int
}
func (pl *TestPreFilterWithUpdaterPlugin) Name() string {
return preFilterWithUpdaterPluginName
}
func (pl *TestPreFilterWithUpdaterPlugin) PreFilter(pc *PluginContext, p *v1.Pod) *Status {
pl.PreFilterCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) AddPod(pc *PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pl.AddCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) RemovePod(pc *PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
pl.RemoveCalled++
return nil
}
func (pl *TestPreFilterWithUpdaterPlugin) Updater() Updater {
return pl
}
var registry Registry = func() Registry {
r := make(Registry)
r.Register(scoreWithNormalizePlugin1, newScoreWithNormalizePlugin1)
@@ -365,6 +418,44 @@ func TestRunScorePlugins(t *testing.T) {
}
}
func TestPreFilterPlugins(t *testing.T) {
preFilter1 := &TestPreFilterPlugin{}
preFilter2 := &TestPreFilterWithUpdaterPlugin{}
r := make(Registry)
r.Register(preFilterPluginName,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return preFilter1, nil
})
r.Register(preFilterWithUpdaterPluginName,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return preFilter2, nil
})
plugins := &config.Plugins{PreFilter: &config.PluginSet{Enabled: []config.Plugin{{Name: preFilterWithUpdaterPluginName}, {Name: preFilterPluginName}}}}
t.Run("TestPreFilterPlugin", func(t *testing.T) {
f, err := NewFramework(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPreFilterPlugins(nil, nil)
f.RunPreFilterUpdaterAddPod(nil, nil, nil, nil)
f.RunPreFilterUpdaterRemovePod(nil, nil, nil, nil)
if preFilter1.PreFilterCalled != 1 {
t.Errorf("preFilter1 called %v, expected: 1", preFilter1.PreFilterCalled)
}
if preFilter2.PreFilterCalled != 1 {
t.Errorf("preFilter2 called %v, expected: 1", preFilter2.PreFilterCalled)
}
if preFilter2.AddCalled != 1 {
t.Errorf("AddPod called %v, expected: 1", preFilter2.AddCalled)
}
if preFilter2.RemoveCalled != 1 {
t.Errorf("AddPod called %v, expected: 1", preFilter2.RemoveCalled)
}
})
}
func buildConfigDefaultWeights(ps ...string) *config.Plugins {
return buildConfigWithWeights(defaultWeights, ps...)
}

View File

@@ -165,6 +165,18 @@ type QueueSortPlugin interface {
Less(*PodInfo, *PodInfo) bool
}
// Updater is an interface that is included in plugins that allow specifying
// callbacks to make incremental updates to its supposedly pre-calculated
// state.
type Updater interface {
// AddPod is called by the framework while trying to evaluate the impact
// of adding podToAdd to the node while scheduling podToSchedule.
AddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RemovePod is called by the framework while trying to evaluate the impact
// of removing podToRemove from the node while scheduling podToSchedule.
RemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
}
// PreFilterPlugin is an interface that must be implemented by "prefilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PreFilterPlugin interface {
@@ -172,6 +184,13 @@ type PreFilterPlugin interface {
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
// plugins must return success or the pod will be rejected.
PreFilter(pc *PluginContext, p *v1.Pod) *Status
// Updater returns an updater if the plugin implements one, or nil if it
// does not. A Pre-filter plugin can provide an updater to incrementally
// modify its pre-processed info. The framework guarantees that the updater
// AddPod/RemovePod functions will only be called after PreFilter,
// possibly on a cloned PluginContext, and may call those functions more than
// once before calling Filter again on a specific node.
Updater() Updater
}
// FilterPlugin is an interface for Filter plugins. These plugins are called at the
@@ -326,6 +345,16 @@ type Framework interface {
// schedule the target pod.
RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPreFilterUpdaterAddPod calls the AddPod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterUpdaterAddPod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPreFilterUpdaterRemovePod calls the RemovePod interface for the set of configured
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
RunPreFilterUpdaterRemovePod(pc *PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status
// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
// of these plugins returns any status other than "Success", the given node is
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.

View File

@@ -175,6 +175,14 @@ func (*fakeFramework) RunFilterPlugins(pc *framework.PluginContext, pod *v1.Pod,
return nil
}
func (*fakeFramework) RunPreFilterUpdaterAddPod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunPreFilterUpdaterRemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
return nil
}
func (*fakeFramework) RunScorePlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node) (framework.PluginToNodeScores, *framework.Status) {
return nil, nil
}

View File

@@ -326,6 +326,11 @@ func (pp *PreFilterPlugin) Name() string {
return prefilterPluginName
}
// Updater returns the updater interface.
func (pp *PreFilterPlugin) Updater() framework.Updater {
return nil
}
// PreFilter is a test function that returns (true, nil) or errors for testing.
func (pp *PreFilterPlugin) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
pp.numPreFilterCalled++

View File

@@ -27,6 +27,7 @@ import (
policy "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
@@ -38,6 +39,9 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/scheduling"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/plugin/pkg/admission/priority"
testutils "k8s.io/kubernetes/test/utils"
@@ -66,10 +70,80 @@ func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {
return waitForNominatedNodeNameWithTimeout(cs, pod, wait.ForeverTestTimeout)
}
const tokenFilterName = "token-filter"
type tokenFilter struct {
Tokens int
Unresolvable bool
}
// Name returns name of the plugin.
func (fp *tokenFilter) Name() string {
return tokenFilterName
}
func (fp *tokenFilter) Filter(pc *framework.PluginContext, pod *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
if fp.Tokens > 0 {
fp.Tokens--
return nil
}
status := framework.Unschedulable
if fp.Unresolvable {
status = framework.UnschedulableAndUnresolvable
}
return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name))
}
func (fp *tokenFilter) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
return nil
}
func (fp *tokenFilter) AddPod(pc *framework.PluginContext, podToSchedule *v1.Pod,
podToAdd *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
fp.Tokens--
return nil
}
func (fp *tokenFilter) RemovePod(pc *framework.PluginContext, podToSchedule *v1.Pod,
podToRemove *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
fp.Tokens++
return nil
}
func (fp *tokenFilter) Updater() framework.Updater {
return fp
}
var _ = framework.FilterPlugin(&tokenFilter{})
// TestPreemption tests a few preemption scenarios.
func TestPreemption(t *testing.T) {
// Initialize scheduler.
context := initTest(t, "preemption")
// Initialize scheduler with a filter plugin.
var filter tokenFilter
registry := framework.Registry{filterPluginName: func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
return &filter, nil
}}
plugin := &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
},
},
},
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
},
},
},
}
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "preemptiom", nil),
false, nil, registry, plugin, []schedulerconfig.PluginConfig{}, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
@@ -78,14 +152,18 @@ func TestPreemption(t *testing.T) {
v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)},
}
maxTokens := 1000
tests := []struct {
description string
existingPods []*v1.Pod
pod *v1.Pod
initTokens int
unresolvable bool
preemptedPodIndexes map[int]struct{}
}{
{
description: "basic pod preemption",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "victim-pod",
@@ -108,8 +186,61 @@ func TestPreemption(t *testing.T) {
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
},
{
description: "basic pod preemption with filter",
initTokens: 1,
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "victim-pod",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
},
{
// same as the previous test, but the filter is unresolvable.
description: "basic pod preemption with unresolvable filter",
initTokens: 1,
unresolvable: true,
existingPods: []*v1.Pod{
initPausePod(context.clientSet, &pausePodConfig{
Name: "victim-pod",
Namespace: context.ns.Name,
Priority: &lowPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
},
pod: initPausePod(cs, &pausePodConfig{
Name: "preemptor-pod",
Namespace: context.ns.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
preemptedPodIndexes: map[int]struct{}{},
},
{
description: "preemption is performed to satisfy anti-affinity",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(cs, &pausePodConfig{
Name: "pod-0", Namespace: context.ns.Name,
@@ -173,6 +304,7 @@ func TestPreemption(t *testing.T) {
{
// This is similar to the previous case only pod-1 is high priority.
description: "preemption is not performed when anti-affinity is not satisfied",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(cs, &pausePodConfig{
Name: "pod-0", Namespace: context.ns.Name,
@@ -254,6 +386,8 @@ func TestPreemption(t *testing.T) {
}
for _, test := range tests {
filter.Tokens = test.initTokens
filter.Unresolvable = test.unresolvable
pods := make([]*v1.Pod, len(test.existingPods))
// Create and run existingPods.
for i, p := range test.existingPods {