bugfix(scheduler): preemption picks wrong victim node with higher priority pod on it.

Introducing pdb to preemption had disrupted the orderliness of pods in the victims,
which would leads picking wrong victim node with higher priority pod on it.
This commit is contained in:
NoicFank 2024-10-24 11:50:46 +08:00
parent 1ef7d66879
commit 2d540ade5f
3 changed files with 105 additions and 0 deletions

View File

@ -191,6 +191,8 @@ func (pl *DefaultPreemption) SelectVictimsOnNode(
} }
var victims []*v1.Pod var victims []*v1.Pod
numViolatingVictim := 0 numViolatingVictim := 0
// Sort potentialVictims by pod priority from high to low, which ensures to
// reprieve higher priority pods first.
sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) }) sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) })
// Try to reprieve as many pods as possible. We first try to reprieve the PDB // Try to reprieve as many pods as possible. We first try to reprieve the PDB
// violating victims and then other non-violating ones. In both cases, we start // violating victims and then other non-violating ones. In both cases, we start
@ -225,6 +227,11 @@ func (pl *DefaultPreemption) SelectVictimsOnNode(
return nil, 0, framework.AsStatus(err) return nil, 0, framework.AsStatus(err)
} }
} }
// Sort victims after reprieving pods to keep the pods in the victims sorted in order of priority from high to low.
if len(violatingVictims) != 0 && len(nonViolatingVictims) != 0 {
sort.Slice(victims, func(i, j int) bool { return util.MoreImportantPod(victims[i], victims[j]) })
}
return victims, numViolatingVictim, framework.NewStatus(framework.Success) return victims, numViolatingVictim, framework.NewStatus(framework.Success)
} }

View File

@ -21,6 +21,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math"
"math/rand" "math/rand"
"sort" "sort"
"strings" "strings"
@ -142,6 +143,12 @@ func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, p
return nil return nil
} }
const (
LabelKeyIsViolatingPDB = "test.kubernetes.io/is-violating-pdb"
LabelValueViolatingPDB = "violating"
LabelValueNonViolatingPDB = "non-violating"
)
func TestPostFilter(t *testing.T) { func TestPostFilter(t *testing.T) {
onePodRes := map[v1.ResourceName]string{v1.ResourcePods: "1"} onePodRes := map[v1.ResourceName]string{v1.ResourcePods: "1"}
nodeRes := map[v1.ResourceName]string{v1.ResourceCPU: "200m", v1.ResourceMemory: "400"} nodeRes := map[v1.ResourceName]string{v1.ResourceCPU: "200m", v1.ResourceMemory: "400"}
@ -149,6 +156,7 @@ func TestPostFilter(t *testing.T) {
name string name string
pod *v1.Pod pod *v1.Pod
pods []*v1.Pod pods []*v1.Pod
pdbs []*policy.PodDisruptionBudget
nodes []*v1.Node nodes []*v1.Node
filteredNodesStatuses framework.NodeToStatusMap filteredNodesStatuses framework.NodeToStatusMap
extender framework.Extender extender framework.Extender
@ -218,6 +226,29 @@ func TestPostFilter(t *testing.T) {
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"), wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success), wantStatus: framework.NewStatus(framework.Success),
}, },
{
name: "pod can be made schedulable on minHighestPriority node",
pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(veryHighPriority).Obj(),
pods: []*v1.Pod{
st.MakePod().Name("p1").UID("p1").Label(LabelKeyIsViolatingPDB, LabelValueNonViolatingPDB).Namespace(v1.NamespaceDefault).Priority(highPriority).Node("node1").Obj(),
st.MakePod().Name("p2").UID("p2").Label(LabelKeyIsViolatingPDB, LabelValueViolatingPDB).Namespace(v1.NamespaceDefault).Priority(lowPriority).Node("node1").Obj(),
st.MakePod().Name("p3").UID("p3").Label(LabelKeyIsViolatingPDB, LabelValueViolatingPDB).Namespace(v1.NamespaceDefault).Priority(midPriority).Node("node2").Obj(),
},
pdbs: []*policy.PodDisruptionBudget{
st.MakePDB().Name("violating-pdb").Namespace(v1.NamespaceDefault).MatchLabel(LabelKeyIsViolatingPDB, LabelValueViolatingPDB).MinAvailable("100%").Obj(),
st.MakePDB().Name("non-violating-pdb").Namespace(v1.NamespaceDefault).MatchLabel(LabelKeyIsViolatingPDB, LabelValueNonViolatingPDB).MinAvailable("0").DisruptionsAllowed(math.MaxInt32).Obj(),
},
nodes: []*v1.Node{
st.MakeNode().Name("node1").Capacity(onePodRes).Obj(),
st.MakeNode().Name("node2").Capacity(onePodRes).Obj(),
},
filteredNodesStatuses: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable),
"node2": framework.NewStatus(framework.Unschedulable),
},
wantResult: framework.NewPostFilterResultWithNominatedNode("node2"),
wantStatus: framework.NewStatus(framework.Success),
},
{ {
name: "preemption result filtered out by extenders", name: "preemption result filtered out by extenders",
pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Obj(), pod: st.MakePod().Name("p").UID("p").Namespace(v1.NamespaceDefault).Priority(highPriority).Obj(),
@ -347,6 +378,13 @@ func TestPostFilter(t *testing.T) {
for i := range tt.pods { for i := range tt.pods {
podInformer.GetStore().Add(tt.pods[i]) podInformer.GetStore().Add(tt.pods[i])
} }
pdbInformer := informerFactory.Policy().V1().PodDisruptionBudgets().Informer()
for i := range tt.pdbs {
if err := pdbInformer.GetStore().Add(tt.pdbs[i]); err != nil {
t.Fatal(err)
}
}
// Register NodeResourceFit as the Filter & PreFilter plugin. // Register NodeResourceFit as the Filter & PreFilter plugin.
registeredPlugins := []tf.RegisterPluginFunc{ registeredPlugins := []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),

View File

@ -21,11 +21,13 @@ import (
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
resourceapi "k8s.io/api/resource/v1alpha3" resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
) )
@ -212,6 +214,64 @@ func (c *ContainerWrapper) ResourceLimits(limMap map[v1.ResourceName]string) *Co
return c return c
} }
// PodDisruptionBudgetWrapper wraps a PodDisruptionBudget inside.
type PodDisruptionBudgetWrapper struct {
policy.PodDisruptionBudget
}
// MakePDB creates a PodDisruptionBudget wrapper.
func MakePDB() *PodDisruptionBudgetWrapper {
return &PodDisruptionBudgetWrapper{policy.PodDisruptionBudget{}}
}
// Obj returns the inner PodDisruptionBudget.
func (p *PodDisruptionBudgetWrapper) Obj() *policy.PodDisruptionBudget {
return &p.PodDisruptionBudget
}
// Name sets `name` as the name of the inner PodDisruptionBudget.
func (p *PodDisruptionBudgetWrapper) Name(name string) *PodDisruptionBudgetWrapper {
p.SetName(name)
return p
}
// Namespace sets `namespace` as the namespace of the inner PodDisruptionBudget.
func (p *PodDisruptionBudgetWrapper) Namespace(namespace string) *PodDisruptionBudgetWrapper {
p.SetNamespace(namespace)
return p
}
// MinAvailable sets `minAvailable` to the inner PodDisruptionBudget.Spec.MinAvailable.
func (p *PodDisruptionBudgetWrapper) MinAvailable(minAvailable string) *PodDisruptionBudgetWrapper {
p.Spec.MinAvailable = &intstr.IntOrString{
Type: intstr.String,
StrVal: minAvailable,
}
return p
}
// MatchLabel adds a {key,value} to the inner PodDisruptionBudget.Spec.Selector.MatchLabels.
func (p *PodDisruptionBudgetWrapper) MatchLabel(key, value string) *PodDisruptionBudgetWrapper {
selector := p.Spec.Selector
if selector == nil {
selector = &metav1.LabelSelector{}
}
matchLabels := selector.MatchLabels
if matchLabels == nil {
matchLabels = map[string]string{}
}
matchLabels[key] = value
selector.MatchLabels = matchLabels
p.Spec.Selector = selector
return p
}
// DisruptionsAllowed sets `disruptionsAllowed` to the inner PodDisruptionBudget.Status.DisruptionsAllowed.
func (p *PodDisruptionBudgetWrapper) DisruptionsAllowed(disruptionsAllowed int32) *PodDisruptionBudgetWrapper {
p.Status.DisruptionsAllowed = disruptionsAllowed
return p
}
// PodWrapper wraps a Pod inside. // PodWrapper wraps a Pod inside.
type PodWrapper struct{ v1.Pod } type PodWrapper struct{ v1.Pod }