mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Fix filter plugins are not been called during preemption
This commit is contained in:
parent
f4887d692e
commit
d84a75c140
@ -137,7 +137,7 @@ type ScheduleAlgorithm interface {
|
||||
// the pod by preempting lower priority pods if possible.
|
||||
// It returns the node where preemption happened, a list of preempted pods, a
|
||||
// list of pods whose nominated node name should be removed, and error if any.
|
||||
Preempt(*v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
|
||||
Preempt(*framework.PluginContext, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
|
||||
// Predicates() returns a pointer to a map of predicate functions. This is
|
||||
// exposed for testing.
|
||||
Predicates() map[string]predicates.FitPredicate
|
||||
@ -317,7 +317,7 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
|
||||
// other pods with the same priority. The nominated pod prevents other pods from
|
||||
// using the nominated resources and the nominated pod could take a long time
|
||||
// before it is retried after many other pending pods.
|
||||
func (g *genericScheduler) Preempt(pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
|
||||
func (g *genericScheduler) Preempt(pluginContext *framework.PluginContext, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
|
||||
// Scheduler may return various types of errors. Consider preemption only if
|
||||
// the error is of type FitError.
|
||||
fitError, ok := scheduleErr.(*FitError)
|
||||
@ -342,7 +342,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, scheduleErr error) (*v1.Node, []
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
nodeToVictims, err := selectNodesForPreemption(pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
|
||||
nodeToVictims, err := g.selectNodesForPreemption(pluginContext, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
|
||||
g.predicateMetaProducer, g.schedulingQueue, pdbs)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
@ -489,7 +489,8 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
|
||||
checkNode := func(i int) {
|
||||
nodeName := g.cache.NodeTree().Next()
|
||||
|
||||
fits, failedPredicates, err := podFitsOnNode(
|
||||
fits, failedPredicates, status, err := g.podFitsOnNode(
|
||||
pluginContext,
|
||||
pod,
|
||||
meta,
|
||||
g.nodeInfoSnapshot.NodeInfoMap[nodeName],
|
||||
@ -504,18 +505,6 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
|
||||
return
|
||||
}
|
||||
if fits {
|
||||
// Iterate each plugin to verify current node
|
||||
status := g.framework.RunFilterPlugins(pluginContext, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
predicateResultLock.Lock()
|
||||
filteredNodesStatuses[nodeName] = status
|
||||
if !status.IsUnschedulable() {
|
||||
errs[status.Message()]++
|
||||
}
|
||||
predicateResultLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
length := atomic.AddInt32(&filteredLen, 1)
|
||||
if length > numNodesToFind {
|
||||
cancel()
|
||||
@ -525,7 +514,12 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
|
||||
}
|
||||
} else {
|
||||
predicateResultLock.Lock()
|
||||
failedPredicateMap[nodeName] = failedPredicates
|
||||
if !status.IsSuccess() {
|
||||
filteredNodesStatuses[nodeName] = status
|
||||
}
|
||||
if len(failedPredicates) != 0 {
|
||||
failedPredicateMap[nodeName] = failedPredicates
|
||||
}
|
||||
predicateResultLock.Unlock()
|
||||
}
|
||||
}
|
||||
@ -613,15 +607,17 @@ func addNominatedPods(pod *v1.Pod, meta predicates.PredicateMetadata,
|
||||
// When it is called from Preempt, we should remove the victims of preemption and
|
||||
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
|
||||
// It removes victims from meta and NodeInfo before calling this function.
|
||||
func podFitsOnNode(
|
||||
func (g *genericScheduler) podFitsOnNode(
|
||||
pluginContext *framework.PluginContext,
|
||||
pod *v1.Pod,
|
||||
meta predicates.PredicateMetadata,
|
||||
info *schedulernodeinfo.NodeInfo,
|
||||
predicateFuncs map[string]predicates.FitPredicate,
|
||||
queue internalqueue.SchedulingQueue,
|
||||
alwaysCheckAllPredicates bool,
|
||||
) (bool, []predicates.PredicateFailureReason, error) {
|
||||
) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
|
||||
var failedPredicates []predicates.PredicateFailureReason
|
||||
var status *framework.Status
|
||||
|
||||
podsAdded := false
|
||||
// We run predicates twice in some cases. If the node has greater or equal priority
|
||||
@ -660,7 +656,7 @@ func podFitsOnNode(
|
||||
if predicate, exist := predicateFuncs[predicateKey]; exist {
|
||||
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
|
||||
if err != nil {
|
||||
return false, []predicates.PredicateFailureReason{}, err
|
||||
return false, []predicates.PredicateFailureReason{}, nil, err
|
||||
}
|
||||
|
||||
if !fit {
|
||||
@ -676,9 +672,14 @@ func podFitsOnNode(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
status = g.framework.RunFilterPlugins(pluginContext, pod, info.Node().Name)
|
||||
if !status.IsSuccess() && !status.IsUnschedulable() {
|
||||
return false, failedPredicates, status, status.AsError()
|
||||
}
|
||||
}
|
||||
|
||||
return len(failedPredicates) == 0, failedPredicates, nil
|
||||
return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
|
||||
}
|
||||
|
||||
// PrioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
|
||||
@ -992,7 +993,9 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims)
|
||||
|
||||
// selectNodesForPreemption finds all the nodes with possible victims for
|
||||
// preemption in parallel.
|
||||
func selectNodesForPreemption(pod *v1.Pod,
|
||||
func (g *genericScheduler) selectNodesForPreemption(
|
||||
pluginContext *framework.PluginContext,
|
||||
pod *v1.Pod,
|
||||
nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
|
||||
potentialNodes []*v1.Node,
|
||||
fitPredicates map[string]predicates.FitPredicate,
|
||||
@ -1011,7 +1014,7 @@ func selectNodesForPreemption(pod *v1.Pod,
|
||||
if meta != nil {
|
||||
metaCopy = meta.ShallowCopy()
|
||||
}
|
||||
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
|
||||
pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
|
||||
if fits {
|
||||
resultLock.Lock()
|
||||
victims := schedulerapi.Victims{
|
||||
@ -1080,7 +1083,8 @@ func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruption
|
||||
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
|
||||
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
|
||||
// these predicates can be satisfied by removing more pods from the node.
|
||||
func selectVictimsOnNode(
|
||||
func (g *genericScheduler) selectVictimsOnNode(
|
||||
pluginContext *framework.PluginContext,
|
||||
pod *v1.Pod,
|
||||
meta predicates.PredicateMetadata,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo,
|
||||
@ -1121,10 +1125,11 @@ func selectVictimsOnNode(
|
||||
// inter-pod affinity to one or more victims, but we have decided not to
|
||||
// support this case for performance reasons. Having affinity to lower
|
||||
// priority pods is not a recommended configuration anyway.
|
||||
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
|
||||
if fits, _, _, err := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
|
||||
if err != nil {
|
||||
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
||||
}
|
||||
|
||||
return nil, 0, false
|
||||
}
|
||||
var victims []*v1.Pod
|
||||
@ -1136,7 +1141,7 @@ func selectVictimsOnNode(
|
||||
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
|
||||
reprievePod := func(p *v1.Pod) bool {
|
||||
addPod(p)
|
||||
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false)
|
||||
fits, _, _, _ := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false)
|
||||
if !fits {
|
||||
removePod(p)
|
||||
victims = append(victims, p)
|
||||
|
@ -147,8 +147,6 @@ type FakeFilterPlugin struct {
|
||||
returnCode framework.Code
|
||||
}
|
||||
|
||||
var filterPlugin = &FakeFilterPlugin{}
|
||||
|
||||
// Name returns name of the plugin.
|
||||
func (fp *FakeFilterPlugin) Name() string {
|
||||
return "fake-filter-plugin"
|
||||
@ -172,9 +170,11 @@ func (fp *FakeFilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nod
|
||||
return framework.NewStatus(fp.returnCode, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
||||
}
|
||||
|
||||
// NewFilterPlugin is the factory for filter plugin.
|
||||
func NewFilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return filterPlugin, nil
|
||||
// newPlugin returns a plugin factory with specified Plugin.
|
||||
func newPlugin(plugin framework.Plugin) framework.PluginFactory {
|
||||
return func(_ *runtime.Unknown, fh framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return plugin, nil
|
||||
}
|
||||
}
|
||||
|
||||
func makeNodeList(nodeNames []string) []*v1.Node {
|
||||
@ -258,7 +258,8 @@ func TestSelectHost(t *testing.T) {
|
||||
func TestGenericScheduler(t *testing.T) {
|
||||
defer algorithmpredicates.SetPredicatesOrderingDuringTest(order)()
|
||||
|
||||
filterPluginRegistry := framework.Registry{filterPlugin.Name(): NewFilterPlugin}
|
||||
filterPlugin := &FakeFilterPlugin{}
|
||||
filterPluginRegistry := framework.Registry{filterPlugin.Name(): newPlugin(filterPlugin)}
|
||||
filterFramework, err := framework.NewFramework(filterPluginRegistry, &schedulerconfig.Plugins{
|
||||
Filter: &schedulerconfig.PluginSet{
|
||||
Enabled: []schedulerconfig.Plugin{
|
||||
@ -1119,14 +1120,32 @@ var startTime20190107 = metav1.Date(2019, 1, 7, 1, 1, 1, 0, time.UTC)
|
||||
// that podsFitsOnNode works correctly and is tested separately.
|
||||
func TestSelectNodesForPreemption(t *testing.T) {
|
||||
defer algorithmpredicates.SetPredicatesOrderingDuringTest(order)()
|
||||
|
||||
filterPlugin := &FakeFilterPlugin{}
|
||||
filterPluginRegistry := framework.Registry{filterPlugin.Name(): newPlugin(filterPlugin)}
|
||||
filterFramework, err := framework.NewFramework(filterPluginRegistry, &schedulerconfig.Plugins{
|
||||
Filter: &schedulerconfig.PluginSet{
|
||||
Enabled: []schedulerconfig.Plugin{
|
||||
{
|
||||
Name: filterPlugin.Name(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}, []schedulerconfig.PluginConfig{})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when initialize scheduling framework, err :%v", err.Error())
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
predicates map[string]algorithmpredicates.FitPredicate
|
||||
nodes []string
|
||||
pod *v1.Pod
|
||||
pods []*v1.Pod
|
||||
expected map[string]map[string]bool // Map from node name to a list of pods names which should be preempted.
|
||||
addAffinityPredicate bool
|
||||
name string
|
||||
predicates map[string]algorithmpredicates.FitPredicate
|
||||
nodes []string
|
||||
pod *v1.Pod
|
||||
pods []*v1.Pod
|
||||
filterReturnCode framework.Code
|
||||
expected map[string]map[string]bool // Map from node name to a list of pods names which should be preempted.
|
||||
expectednumFilterCalled int32
|
||||
addAffinityPredicate bool
|
||||
}{
|
||||
{
|
||||
name: "a pod that does not fit on any machine",
|
||||
@ -1136,7 +1155,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
pods: []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", UID: types.UID("a")}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", UID: types.UID("b")}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
|
||||
expected: map[string]map[string]bool{},
|
||||
expected: map[string]map[string]bool{},
|
||||
expectednumFilterCalled: 2,
|
||||
},
|
||||
{
|
||||
name: "a pod that fits with no preemption",
|
||||
@ -1146,7 +1166,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
pods: []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", UID: types.UID("a")}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", UID: types.UID("b")}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
|
||||
expected: map[string]map[string]bool{"machine1": {}, "machine2": {}},
|
||||
expected: map[string]map[string]bool{"machine1": {}, "machine2": {}},
|
||||
expectednumFilterCalled: 4,
|
||||
},
|
||||
{
|
||||
name: "a pod that fits on one machine with no preemption",
|
||||
@ -1156,7 +1177,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
pods: []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", UID: types.UID("a")}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", UID: types.UID("b")}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
|
||||
expected: map[string]map[string]bool{"machine1": {}},
|
||||
expected: map[string]map[string]bool{"machine1": {}},
|
||||
expectednumFilterCalled: 3,
|
||||
},
|
||||
{
|
||||
name: "a pod that fits on both machines when lower priority pods are preempted",
|
||||
@ -1166,7 +1188,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
pods: []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", UID: types.UID("a")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", UID: types.UID("b")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
|
||||
expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {"b": true}},
|
||||
expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {"b": true}},
|
||||
expectednumFilterCalled: 4,
|
||||
},
|
||||
{
|
||||
name: "a pod that would fit on the machines, but other pods running are higher priority",
|
||||
@ -1176,7 +1199,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
pods: []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", UID: types.UID("a")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", UID: types.UID("b")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
|
||||
expected: map[string]map[string]bool{},
|
||||
expected: map[string]map[string]bool{},
|
||||
expectednumFilterCalled: 2,
|
||||
},
|
||||
{
|
||||
name: "medium priority pod is preempted, but lower priority one stays as it is small",
|
||||
@ -1187,7 +1211,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", UID: types.UID("a")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", UID: types.UID("b")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "c", UID: types.UID("c")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
|
||||
expected: map[string]map[string]bool{"machine1": {"b": true}, "machine2": {"c": true}},
|
||||
expected: map[string]map[string]bool{"machine1": {"b": true}, "machine2": {"c": true}},
|
||||
expectednumFilterCalled: 5,
|
||||
},
|
||||
{
|
||||
name: "mixed priority pods are preempted",
|
||||
@ -1200,7 +1225,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "c", UID: types.UID("c")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "d", UID: types.UID("d")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "e", UID: types.UID("e")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
|
||||
expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}},
|
||||
expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}},
|
||||
expectednumFilterCalled: 5,
|
||||
},
|
||||
{
|
||||
name: "mixed priority pods are preempted, pick later StartTime one when priorities are equal",
|
||||
@ -1213,7 +1239,8 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "c", UID: types.UID("c")}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190105}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "d", UID: types.UID("d")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}, Status: v1.PodStatus{StartTime: &startTime20190104}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "e", UID: types.UID("e")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}, Status: v1.PodStatus{StartTime: &startTime20190103}}},
|
||||
expected: map[string]map[string]bool{"machine1": {"a": true, "c": true}},
|
||||
expected: map[string]map[string]bool{"machine1": {"a": true, "c": true}},
|
||||
expectednumFilterCalled: 5,
|
||||
},
|
||||
{
|
||||
name: "pod with anti-affinity is preempted",
|
||||
@ -1243,8 +1270,9 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", UID: types.UID("b")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "d", UID: types.UID("d")}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "e", UID: types.UID("e")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
|
||||
expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {}},
|
||||
addAffinityPredicate: true,
|
||||
expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {}},
|
||||
expectednumFilterCalled: 4,
|
||||
addAffinityPredicate: true,
|
||||
},
|
||||
{
|
||||
name: "preemption to resolve even pods spread FitError",
|
||||
@ -1320,11 +1348,53 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
"node-a": {"pod-a2": true},
|
||||
"node-b": {"pod-b1": true},
|
||||
},
|
||||
expectednumFilterCalled: 6,
|
||||
},
|
||||
{
|
||||
name: "get Unschedulable in the preemption phase when the filter plugins filtering the nodes",
|
||||
predicates: map[string]algorithmpredicates.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
|
||||
pods: []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "a", UID: types.UID("a")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "b", UID: types.UID("b")}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
|
||||
filterReturnCode: framework.Unschedulable,
|
||||
expected: map[string]map[string]bool{},
|
||||
expectednumFilterCalled: 2,
|
||||
},
|
||||
}
|
||||
labelKeys := []string{"hostname", "zone", "region"}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
cache := internalcache.New(time.Duration(0), wait.NeverStop)
|
||||
for _, pod := range test.pods {
|
||||
cache.AddPod(pod)
|
||||
}
|
||||
for _, name := range test.nodes {
|
||||
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{"hostname": name}}})
|
||||
}
|
||||
|
||||
predMetaProducer := algorithmpredicates.EmptyPredicateMetadataProducer
|
||||
|
||||
filterPlugin.returnCode = test.filterReturnCode
|
||||
scheduler := NewGenericScheduler(
|
||||
nil,
|
||||
internalqueue.NewSchedulingQueue(nil, nil),
|
||||
test.predicates,
|
||||
predMetaProducer,
|
||||
nil,
|
||||
priorities.EmptyPriorityMetadataProducer,
|
||||
filterFramework,
|
||||
[]algorithm.SchedulerExtender{},
|
||||
nil,
|
||||
nil,
|
||||
schedulertesting.FakePDBLister{},
|
||||
false,
|
||||
false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
false)
|
||||
g := scheduler.(*genericScheduler)
|
||||
|
||||
assignDefaultStartTime(test.pods)
|
||||
|
||||
nodes := []*v1.Node{}
|
||||
@ -1348,13 +1418,21 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
newnode := makeNode("newnode", 1000*5, priorityutil.DefaultMemoryRequest*5)
|
||||
newnode.ObjectMeta.Labels = map[string]string{"hostname": "newnode"}
|
||||
nodes = append(nodes, newnode)
|
||||
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
|
||||
pluginContext := framework.NewPluginContext()
|
||||
nodeToPods, err := g.selectNodesForPreemption(pluginContext, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if test.expectednumFilterCalled != filterPlugin.numFilterCalled {
|
||||
t.Errorf("expected filterPlugin.numFilterCalled is %d,nut got %d", test.expectednumFilterCalled, filterPlugin.numFilterCalled)
|
||||
}
|
||||
|
||||
if err := checkPreemptionVictims(test.expected, nodeToPods); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
filterPlugin.reset()
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1554,6 +1632,9 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
g := &genericScheduler{
|
||||
framework: emptyFramework,
|
||||
}
|
||||
assignDefaultStartTime(test.pods)
|
||||
|
||||
nodes := []*v1.Node{}
|
||||
@ -1561,7 +1642,8 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5))
|
||||
}
|
||||
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, nodes)
|
||||
candidateNodes, _ := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
|
||||
pluginContext := framework.NewPluginContext()
|
||||
candidateNodes, _ := g.selectNodesForPreemption(pluginContext, test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
|
||||
node := pickOneNodeForPreemption(candidateNodes)
|
||||
found := false
|
||||
for _, nodeName := range test.expected {
|
||||
@ -2045,13 +2127,14 @@ func TestPreempt(t *testing.T) {
|
||||
false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
true)
|
||||
pluginContext := framework.NewPluginContext()
|
||||
scheduler.(*genericScheduler).snapshot()
|
||||
// Call Preempt and check the expected results.
|
||||
failedPredMap := defaultFailedPredMap
|
||||
if test.failedPredMap != nil {
|
||||
failedPredMap = test.failedPredMap
|
||||
}
|
||||
node, victims, _, err := scheduler.Preempt(test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
node, victims, _, err := scheduler.Preempt(pluginContext, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error in preemption: %v", err)
|
||||
}
|
||||
@ -2081,7 +2164,7 @@ func TestPreempt(t *testing.T) {
|
||||
test.pod.Status.NominatedNodeName = node.Name
|
||||
}
|
||||
// Call preempt again and make sure it doesn't preempt any more pods.
|
||||
node, victims, _, err = scheduler.Preempt(test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
node, victims, _, err = scheduler.Preempt(pluginContext, test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error in preemption: %v", err)
|
||||
}
|
||||
|
@ -349,14 +349,14 @@ func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginCon
|
||||
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
|
||||
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
|
||||
// It returns the node name and an error if any.
|
||||
func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
|
||||
func (sched *Scheduler) preempt(pluginContext *framework.PluginContext, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
|
||||
preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor)
|
||||
if err != nil {
|
||||
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(preemptor, scheduleErr)
|
||||
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(pluginContext, preemptor, scheduleErr)
|
||||
if err != nil {
|
||||
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
|
||||
return "", err
|
||||
@ -544,7 +544,7 @@ func (sched *Scheduler) scheduleOne() {
|
||||
" No preemption is performed.")
|
||||
} else {
|
||||
preemptionStartTime := time.Now()
|
||||
sched.preempt(fwk, pod, fitError)
|
||||
sched.preempt(pluginContext, fwk, pod, fitError)
|
||||
metrics.PreemptionAttempts.Inc()
|
||||
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
|
||||
metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
|
||||
|
@ -164,7 +164,7 @@ func (es mockScheduler) Prioritizers() []priorities.PriorityConfig {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es mockScheduler) Preempt(pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
|
||||
func (es mockScheduler) Preempt(pc *framework.PluginContext, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
|
||||
return nil, nil, nil, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user