Support score extension function in preemption.

This commit is contained in:
lianghao208
2023-11-14 11:33:13 +08:00
parent 5dc9453700
commit 34e620d18c
4 changed files with 210 additions and 56 deletions

View File

@@ -82,6 +82,47 @@ func (pl *FakePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominate
return true, ""
}
func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
return nil
}
type FakePreemptionScorePostFilterPlugin struct{}
func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode(
ctx context.Context, state *framework.CycleState, pod *v1.Pod,
nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) (victims []*v1.Pod, numViolatingVictim int, status *framework.Status) {
return append(victims, nodeInfo.Pods[0].Pod), 1, nil
}
func (pl *FakePreemptionScorePostFilterPlugin) GetOffsetAndNumCandidates(nodes int32) (int32, int32) {
return 0, nodes
}
func (pl *FakePreemptionScorePostFilterPlugin) CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
m := make(map[string]*extenderv1.Victims, len(candidates))
for _, c := range candidates {
m[c.Name()] = c.Victims()
}
return m
}
func (pl *FakePreemptionScorePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
return true, ""
}
func (pl *FakePreemptionScorePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
return []func(string) int64{
func(node string) int64 {
var sumContainers int64
for _, pod := range nodesToVictims[node].Pods {
sumContainers += int64(len(pod.Spec.Containers) + len(pod.Spec.InitContainers))
}
// The smaller the sumContainers, the higher the score.
return -sumContainers
},
}
}
func TestNodesWherePreemptionMightHelp(t *testing.T) {
// Prepare 4 nodes names.
nodeNames := []string{"node1", "node2", "node3", "node4"}
@@ -337,3 +378,100 @@ func TestDryRunPreemption(t *testing.T) {
})
}
}
func TestSelectCandidate(t *testing.T) {
tests := []struct {
name string
nodeNames []string
pod *v1.Pod
testPods []*v1.Pod
expected string
}{
{
name: "pod has different number of containers on each node",
nodeNames: []string{"node1", "node2", "node3"},
pod: st.MakePod().Name("p").UID("p").Priority(highPriority).Req(veryLargeRes).Obj(),
testPods: []*v1.Pod{
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(midPriority).Containers([]v1.Container{
st.MakeContainer().Name("container1").Obj(),
st.MakeContainer().Name("container2").Obj(),
}).Obj(),
st.MakePod().Name("p2.1").UID("p2.1").Node("node2").Priority(midPriority).Containers([]v1.Container{
st.MakeContainer().Name("container1").Obj(),
}).Obj(),
st.MakePod().Name("p3.1").UID("p3.1").Node("node3").Priority(midPriority).Containers([]v1.Container{
st.MakeContainer().Name("container1").Obj(),
st.MakeContainer().Name("container2").Obj(),
st.MakeContainer().Name("container3").Obj(),
}).Obj(),
},
expected: "node2",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
nodes := make([]*v1.Node, len(tt.nodeNames))
for i, nodeName := range tt.nodeNames {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
}
registeredPlugins := append([]tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
var objs []runtime.Object
objs = append(objs, tt.pod)
for _, pod := range tt.testPods {
objs = append(objs, pod)
}
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
snapshot := internalcache.NewSnapshot(tt.testPods, nodes)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fwk, err := tf.NewFramework(
ctx,
registeredPlugins,
"",
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithLogger(logger),
)
if err != nil {
t.Fatal(err)
}
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status)
}
nodeInfos, err := snapshot.NodeInfos().List()
if err != nil {
t.Fatal(err)
}
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
for _, pod := range tt.testPods {
state := framework.NewCycleState()
pe := Evaluator{
PluginName: "FakePreemptionScorePostFilter",
Handler: fwk,
Interface: fakePreemptionScorePostFilterPlugin,
State: state,
}
candidates, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
s := pe.SelectCandidate(ctx, candidates)
if s == nil || len(s.Name()) == 0 {
t.Errorf("expect any node in %v, but no candidate selected", tt.expected)
return
}
if diff := cmp.Diff(tt.expected, s.Name()); diff != "" {
t.Errorf("expect any node in %v, but got %v", tt.expected, s.Name())
}
}
})
}
}