take PodTopologySpread into consideration when requeueing Pods based on Pod related events

This commit is contained in:
Kensei Nakada
2024-01-07 04:05:34 +00:00
parent 8d0ee91fc7
commit 533140f065
4 changed files with 100 additions and 11 deletions

View File

@@ -205,7 +205,20 @@ func (sched *Scheduler) addPodToCache(obj interface{}) {
logger.Error(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
}
sched.SchedulingQueue.AssignedPodAdded(logger, pod)
// SchedulingQueue.AssignedPodAdded has a problem:
// It internally pre-filters Pods to move to activeQ,
// while taking only in-tree plugins into consideration.
// Consequently, if custom plugins that subscribes Pod/Add events reject Pods,
// those Pods will never be requeued to activeQ by an assigned Pod related events,
// and they may be stuck in unschedulableQ.
//
// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodAdd, nil, pod, nil)
} else {
sched.SchedulingQueue.AssignedPodAdded(logger, pod)
}
}
func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
@@ -226,7 +239,20 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
}
sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod)
// SchedulingQueue.AssignedPodUpdated has a problem:
// It internally pre-filters Pods to move to activeQ,
// while taking only in-tree plugins into consideration.
// Consequently, if custom plugins that subscribes Pod/Update events reject Pods,
// those Pods will never be requeued to activeQ by an assigned Pod related events,
// and they may be stuck in unschedulableQ.
//
// Here we use MoveAllToActiveOrBackoffQueue only when QueueingHint is enabled.
// (We cannot switch to MoveAllToActiveOrBackoffQueue right away because of throughput concern.)
if utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodUpdate, oldPod, newPod, nil)
} else {
sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod)
}
}
func (sched *Scheduler) deletePodFromCache(obj interface{}) {

View File

@@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/internal/heap"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
@@ -1100,7 +1101,10 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd, nil, pod)
// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
// because Pod related events shouldn't make Pods that rejected by single-node scheduling requirement schedulable.
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, pod), AssignedPodAdd, nil, pod)
p.lock.Unlock()
}
@@ -1123,9 +1127,13 @@ func isPodResourcesResizedDown(pod *v1.Pod) bool {
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) {
p.lock.Lock()
if isPodResourcesResizedDown(newPod) {
// This case, we don't want to pre-filter Pods by getUnschedulablePodsWithCrossTopologyTerm
// because Pod related events maybe make Pods that rejected by NodeResourceFit schedulable.
p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil)
} else {
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod)
// Pre-filter Pods to move by getUnschedulablePodsWithCrossTopologyTerm
// because Pod related events only make Pods rejected by cross topology term schedulable.
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithCrossTopologyTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod)
}
p.lock.Unlock()
}
@@ -1266,22 +1274,29 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
}
}
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod".
// getUnschedulablePodsWithCrossTopologyTerm returns unschedulable pods which either of following conditions is met:
// - have any affinity term that matches "pod".
// - rejected by PodTopologySpread plugin.
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(logger klog.Logger, pod *v1.Pod) []*framework.QueuedPodInfo {
func (p *PriorityQueue) getUnschedulablePodsWithCrossTopologyTerm(logger klog.Logger, pod *v1.Pod) []*framework.QueuedPodInfo {
nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(logger, pod.Namespace, p.nsLister)
var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulablePods.podInfoMap {
if pInfo.UnschedulablePlugins.Has(podtopologyspread.Name) {
// This Pod may be schedulable now by this Pod event.
podsToMove = append(podsToMove, pInfo)
continue
}
for _, term := range pInfo.RequiredAffinityTerms {
if term.Matches(pod, nsLabels) {
podsToMove = append(podsToMove, pInfo)
break
}
}
}
return podsToMove
}

View File

@@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates"
"k8s.io/kubernetes/pkg/scheduler/metrics"
@@ -1938,6 +1939,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
defer cancel()
affinityPod := st.MakePod().Name("afp").Namespace("ns1").UID("afp").Annotation("annot2", "val2").Priority(mediumPriority).NominatedNodeName("node1").PodAffinityExists("service", "region", st.PodAffinityWithRequiredReq).Obj()
spreadPod := st.MakePod().Name("tsp").Namespace("ns1").UID("tsp").SpreadConstraint(1, "node", v1.DoNotSchedule, nil, nil, nil, nil, nil).Obj()
labelPod := st.MakePod().Name("lbp").Namespace(affinityPod.Namespace).Label("service", "securityscan").Node("node1").Obj()
c := testingclock.NewFakeClock(time.Now())
@@ -1947,17 +1949,31 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
PluginName: "fakePlugin",
QueueingHintFn: queueHintReturnQueue,
},
{
PluginName: podtopologyspread.Name,
QueueingHintFn: queueHintReturnQueue,
},
}
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
// To simulate the pod is failed in scheduling in the real world, Pop() the pod from activeQ before AddUnschedulableIfNotPresent()s below.
q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod))
if err := q.activeQ.Add(q.newQueuedPodInfo(unschedulablePodInfo.Pod)); err != nil {
t.Errorf("failed to add pod to activeQ: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != unschedulablePodInfo.Pod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePodInfo.Pod.Name, p.Pod.Name)
}
q.activeQ.Add(q.newQueuedPodInfo(affinityPod))
if err := q.activeQ.Add(q.newQueuedPodInfo(affinityPod)); err != nil {
t.Errorf("failed to add pod to activeQ: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != affinityPod {
t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name)
}
if err := q.activeQ.Add(q.newQueuedPodInfo(spreadPod)); err != nil {
t.Errorf("failed to add pod to activeQ: %v", err)
}
if p, err := q.Pop(logger); err != nil || p.Pod != spreadPod {
t.Errorf("Expected: %v after Pop, but got: %v", affinityPod.Name, p.Pod.Name)
}
q.Add(logger, medPriorityPodInfo.Pod)
err := q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle())
if err != nil {
@@ -1967,18 +1983,30 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(spreadPod, podtopologyspread.Name), q.SchedulingCycle())
if err != nil {
t.Fatalf("unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Simulate addition of an assigned pod. The pod has matching labels for
// affinityPod. So, affinityPod should go to activeQ.
// affinityPod.
// - affinityPod should go to activeQ.
// - spreadPod should go to activeQ because it's rejected by podtopologyspread.
q.AssignedPodAdded(logger, labelPod)
if getUnschedulablePod(q, affinityPod) != nil {
t.Error("affinityPod is still in the unschedulablePods.")
}
if getUnschedulablePod(q, spreadPod) != nil {
t.Error("spreadPod is still in the unschedulablePods.")
}
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(affinityPod)); !exists {
t.Error("affinityPod is not moved to activeQ.")
}
if _, exists, _ := q.activeQ.Get(newQueuedPodInfoForLookup(spreadPod)); !exists {
t.Error("spreadPod is not moved to activeQ.")
}
// Check that the other pod is still in the unschedulablePods.
if getUnschedulablePod(q, unschedulablePodInfo.Pod) == nil {
t.Error("unschedulablePodInfo is not in the unschedulablePods.")

View File

@@ -53,6 +53,7 @@ import (
testutils "k8s.io/kubernetes/test/integration/util"
imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)
func TestSchedulingGates(t *testing.T) {
@@ -362,6 +363,25 @@ func TestCoreResourceEnqueue(t *testing.T) {
},
wantRequeuedPods: sets.New("pod1"),
},
{
name: "Pods with PodTopologySpread should be requeued when a Pod with matching label is scheduled",
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()},
initialPod: st.MakePod().Name("pod1").Label("key", "val").Container("image").Node("fake-node").Obj(),
pods: []*v1.Pod{
// - Pod2 will be rejected by the PodTopologySpread plugin.
st.MakePod().Name("pod2").Label("key", "val").SpreadConstraint(1, "node", v1.DoNotSchedule, st.MakeLabelSelector().Exists("key").Obj(), ptr.To(int32(3)), nil, nil, nil).Container("image").Obj(),
},
triggerFn: func(testCtx *testutils.TestContext) error {
// Trigger an assigned Pod add event.
pod := st.MakePod().Name("pod3").Label("key", "val").Node("fake-node").Container("image").Obj()
if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create Pod %q: %w", pod.Name, err)
}
return nil
},
wantRequeuedPods: sets.New("pod2"),
},
}
for _, tt := range tests {