address review suggestions

This commit is contained in:
Kensei Nakada 2024-06-28 00:15:43 +00:00
parent 533140f065
commit e16aa35865
2 changed files with 88 additions and 75 deletions

View File

@ -1127,8 +1127,8 @@ func isPodResourcesResizedDown(pod *v1.Pod) bool {
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) { func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) {
p.lock.Lock() p.lock.Lock()
if isPodResourcesResizedDown(newPod) { if isPodResourcesResizedDown(newPod) {
// This case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm // In this case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm
// because Pod related events maybe make Pods that rejected by NodeResourceFit schedulable. // because Pod related events may make Pods that were rejected by NodeResourceFit schedulable.
p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil) p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil)
} else { } else {
// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm // Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
@ -1283,7 +1283,7 @@ func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Lo
var podsToMove []*framework.QueuedPodInfo var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulablePods.podInfoMap { for _, pInfo := range p.unschedulablePods.podInfoMap {
if pInfo.UnschedulablePlugins.Has(podtopologyspread.Name) { if pInfo.UnschedulablePlugins.Has(podtopologyspread.Name) && pod.Namespace == pInfo.Pod.Namespace {
// This Pod may be schedulable now by this Pod event. // This Pod may be schedulable now by this Pod event.
podsToMove = append(podsToMove, pInfo) podsToMove = append(podsToMove, pInfo)
continue continue

View File

@ -45,7 +45,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
@ -1933,83 +1932,97 @@ func expectInFlightPods(t *testing.T, q *PriorityQueue, uids ...types.UID) {
// TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that // TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that
// when a pod with pod affinity is in unschedulablePods and another pod with a // when a pod with pod affinity is in unschedulablePods and another pod with a
// matching label is added, the unschedulable pod is moved to activeQ. // matching label is added, the unschedulable pod is moved to activeQ.
func TestPriorityQueue_AssignedPodAdded(t *testing.T) { func TestPriorityQueue_AssignedPodAdded_(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t) tests := []struct {
ctx, cancel := context.WithCancel(ctx) name string
defer cancel() unschedPod *v1.Pod
unschedPlugin string
affinityPod := st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Priority(mediumPriority).NominatedNodeName("node1").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj() updatedAssignedPod *v1.Pod
spreadPod := st.MakePod().Name("tsp").Namespace("ns1").UID("tsp").SpreadConstraint(1, "node", v1.DoNotSchedule, nil, nil, nil, nil, nil).Obj() wantToRequeue bool
labelPod := st.MakePod().Name("lbp").Namespace(affinityPod.Namespace).Label("service", "securityscan").Node("node1").Obj() }{
c := testingclock.NewFakeClock(time.Now())
m := makeEmptyQueueingHintMapPerProfile()
m[""][AssignedPodAdd] = []*QueueingHintFunction{
{ {
PluginName: "fakePlugin", name: "Pod rejected by pod affinity is requeued with matching Pod's update",
QueueingHintFn: queueHintReturnQueue, unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(),
unschedPlugin: names.InterPodAffinity,
updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns1").Label("service", "securityscan").Node("node1").Obj(),
wantToRequeue: true,
}, },
{ {
PluginName: podtopologyspread.Name, name: "Pod rejected by pod affinity isn't requeued with unrelated Pod's update",
QueueingHintFn: queueHintReturnQueue, unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(),
unschedPlugin: names.InterPodAffinity,
updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(),
wantToRequeue: false,
},
{
name: "Pod rejected by pod topology spread is requeued with Pod's update in the same namespace",
unschedPod: st.MakePod().Name("tsp").Namespace("ns1").UID("tsp").SpreadConstraint(1, "node", v1.DoNotSchedule, nil, nil, nil, nil, nil).Obj(),
unschedPlugin: names.PodTopologySpread,
updatedAssignedPod: st.MakePod().Name("lbp").Namespace("ns1").Label("service", "securityscan").Node("node1").Obj(),
wantToRequeue: true,
},
{
name: "Pod rejected by pod topology spread isn't requeued with unrelated Pod's update",
unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj(),
unschedPlugin: names.PodTopologySpread,
updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(),
wantToRequeue: false,
},
{
name: "Pod rejected by other plugins isn't requeued with any Pod's update",
unschedPod: st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Obj(),
unschedPlugin: "fakePlugin",
updatedAssignedPod: st.MakePod().Name("lbp").Namespace("unrelated").Label("unrelated", "unrelated").Node("node1").Obj(),
wantToRequeue: false,
}, },
} }
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
if err := q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)); err != nil {
t.Errorf("failed to add pod to activeQ: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
if err := q.activeQ.Add(q.newQueuedPodInfo(affinityPod)); err != nil {
t.Errorf("failed to add pod to activeQ: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != affinityPod {
t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name)
}
if err := q.activeQ.Add(q.newQueuedPodInfo(spreadPod)); err != nil {
t.Errorf("failed to add pod to activeQ: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != spreadPod {
t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name)
}
q.Add(logger, medPriorityPodInfo.Pod)
err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(affinityPod, "fakePlugin"), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(spreadPod, podtopologyspread.Name), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// Move clock to make the unschedulable pods complete backoff. for _, tt := range tests {
c.Step(DefaultPodInitialBackoffDuration + time.Second) t.Run(tt.name, func(t *testing.T) {
// Simulate addition of an assigned pod. The pod has matching labels for logger, ctx := ktesting.NewTestContext(t)
// affinityPod. ctx, cancel := context.WithCancel(ctx)
// - affinityPod should go to activeQ. defer cancel()
// - spreadPod should go to activeQ because it's rejected by podtopologyspread.
q.AssignedPodAdded(logger, labelPod) c := testingclock.NewFakeClock(time.Now())
if getUnschedulablePod(q, affinityPod) != nil { m := makeEmptyQueueingHintMapPerProfile()
t.Error("affinityPod is still in the unschedulablePods.") m[""][AssignedPodAdd] = []*QueueingHintFunction{
} {
if getUnschedulablePod(q, spreadPod) != nil { PluginName: "fakePlugin",
t.Error("spreadPod is still in the unschedulablePods.") QueueingHintFn: queueHintReturnQueue,
} },
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(affinityPod)); !exists { {
t.Error("affinityPod is not moved to activeQ.") PluginName: names.InterPodAffinity,
} QueueingHintFn: queueHintReturnQueue,
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(spreadPod)); !exists { },
t.Error("spreadPod is not moved to activeQ.") {
} PluginName: names.PodTopologySpread,
// Check that the other pod is still in the unschedulablePods. QueueingHintFn: queueHintReturnQueue,
if getUnschedulablePod(q, unschedulablePodInfo.Pod) == nil { },
t.Error("unschedulablePodInfo is not in the unschedulablePods.") }
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
if err := q.activeQ.Add(q.newQueuedPodInfo(tt.unschedPod)); err != nil {
t.Errorf("failed to add pod to activeQ: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != tt.unschedPod {
t.Errorf("Expected: %v after Pop, but got: %v", tt.unschedPod.Name, p.Pod.Name)
}
err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(tt.unschedPod, tt.unschedPlugin), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
q.AssignedPodAdded(logger, tt.updatedAssignedPod)
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(tt.unschedPod)); exists != tt.wantToRequeue {
t.Fatalf("unexpected Pod move: Pod should be requeued: %v. Pod is actually requeued: %v", tt.wantToRequeue, exists)
}
})
} }
} }