Trigger rescheduling on delete event also when unscheduled pod is removed

This commit is contained in:
dom4ha 2024-12-02 11:16:05 +00:00
parent 4806519423
commit 4deb4f2b5f
10 changed files with 372 additions and 43 deletions

View File

@ -160,6 +160,16 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
logger.V(4).Info("Update event for unscheduled pod", "pod", klog.KObj(newPod)) logger.V(4).Info("Update event for unscheduled pod", "pod", klog.KObj(newPod))
sched.SchedulingQueue.Update(logger, oldPod, newPod) sched.SchedulingQueue.Update(logger, oldPod, newPod)
if hasNominatedNodeNameChanged(oldPod, newPod) {
// Nominated node changed in pod, so we need to treat it as if the pod was deleted from the old nominated node,
// because the scheduler treats such a pod as if it was already assigned when scheduling lower or equal priority pods.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, oldPod, nil, getLEPriorityPreCheck(corev1helpers.PodPriority(oldPod)))
}
}
// hasNominatedNodeNameChanged returns true when nominated node name has existed but changed.
func hasNominatedNodeNameChanged(oldPod, newPod *v1.Pod) bool {
return len(oldPod.Status.NominatedNodeName) > 0 && oldPod.Status.NominatedNodeName != newPod.Status.NominatedNodeName
} }
func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
@ -195,8 +205,21 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
// If a waiting pod is rejected, it indicates it's previously assumed and we're // If a waiting pod is rejected, it indicates it's previously assumed and we're
// removing it from the scheduler cache. In this case, signal a AssignedPodDelete // removing it from the scheduler cache. In this case, signal a AssignedPodDelete
// event to immediately retry some unscheduled Pods. // event to immediately retry some unscheduled Pods.
// Similarly when a pod that had nominated node is deleted, it can unblock scheduling of other pods,
// because the lower or equal priority pods treat such a pod as if it was assigned.
if fwk.RejectWaitingPod(pod.UID) { if fwk.RejectWaitingPod(pod.UID) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, nil) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, nil)
} else if pod.Status.NominatedNodeName != "" {
// Note that a nominated pod can fall into `RejectWaitingPod` case as well,
// but in that case the `MoveAllToActiveOrBackoffQueue` already covered lower priority pods.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, getLEPriorityPreCheck(corev1helpers.PodPriority(pod)))
}
}
// getLEPriorityPreCheck is a PreEnqueueCheck function that selects only lower or equal priority pods.
func getLEPriorityPreCheck(priority int32) queue.PreEnqueueCheck {
return func(pod *v1.Pod) bool {
return corev1helpers.PodPriority(pod) <= priority
} }
} }

View File

@ -18,6 +18,7 @@ package scheduler
import ( import (
"context" "context"
"fmt"
"reflect" "reflect"
"testing" "testing"
"time" "time"
@ -30,8 +31,10 @@ import (
resourceapi "k8s.io/api/resource/v1beta1" resourceapi "k8s.io/api/resource/v1beta1"
storagev1 "k8s.io/api/storage/v1" storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -42,18 +45,168 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/backend/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
"k8s.io/kubernetes/pkg/scheduler/backend/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/scheduler/util/assumecache"
) )
func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) {
metrics.Register()
highPriorityPod :=
st.MakePod().Name("hpp").Namespace("ns1").UID("hppns1").Priority(highPriority).SchedulerName(testSchedulerName).Obj()
medNominatedPriorityPod :=
st.MakePod().Name("mpp").Namespace("ns2").UID("mppns1").Priority(midPriority).SchedulerName(testSchedulerName).NominatedNodeName("node1").Obj()
medPriorityPod :=
st.MakePod().Name("smpp").Namespace("ns3").UID("mppns2").Priority(midPriority).SchedulerName(testSchedulerName).Obj()
lowPriorityPod :=
st.MakePod().Name("lpp").Namespace("ns4").UID("lppns1").Priority(lowPriority).SchedulerName(testSchedulerName).Obj()
unschedulablePods := []*v1.Pod{highPriorityPod, medNominatedPriorityPod, medPriorityPod, lowPriorityPod}
// Make pods schedulable on Delete event when QHints are enabled, but not when nominated node appears.
queueHintForPodDelete := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
oldPod, _, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
t.Errorf("Failed to convert objects to pods: %v", err)
}
if oldPod.Status.NominatedNodeName == "" {
return framework.QueueSkip, nil
}
return framework.Queue, nil
}
queueingHintMap := internalqueue.QueueingHintMapPerProfile{
testSchedulerName: {
framework.EventAssignedPodDelete: {
{
PluginName: "fooPlugin1",
QueueingHintFn: queueHintForPodDelete,
},
},
},
}
tests := []struct {
name string
updateFunc func(s *Scheduler)
wantInActive sets.Set[string]
}{
{
name: "Update of a nominated node name to a different value should trigger rescheduling of lower priority pods",
updateFunc: func(s *Scheduler) {
updatedPod := medNominatedPriorityPod.DeepCopy()
updatedPod.Status.NominatedNodeName = "node2"
updatedPod.ResourceVersion = "1"
s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod)
},
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name),
},
{
name: "Removal of a nominated node name should trigger rescheduling of lower priority pods",
updateFunc: func(s *Scheduler) {
updatedPod := medNominatedPriorityPod.DeepCopy()
updatedPod.Status.NominatedNodeName = ""
updatedPod.ResourceVersion = "1"
s.updatePodInSchedulingQueue(medNominatedPriorityPod, updatedPod)
},
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name, medNominatedPriorityPod.Name),
},
{
name: "Removal of a pod that had nominated node name should trigger rescheduling of lower priority pods",
updateFunc: func(s *Scheduler) {
s.deletePodFromSchedulingQueue(medNominatedPriorityPod)
},
wantInActive: sets.New(lowPriorityPod.Name, medPriorityPod.Name),
},
{
name: "Addition of a nominated node name to the high priority pod that did not have it before shouldn't trigger rescheduling",
updateFunc: func(s *Scheduler) {
updatedPod := highPriorityPod.DeepCopy()
updatedPod.Status.NominatedNodeName = "node2"
updatedPod.ResourceVersion = "1"
s.updatePodInSchedulingQueue(highPriorityPod, updatedPod)
},
wantInActive: sets.New[string](),
},
}
for _, tt := range tests {
for _, qHintEnabled := range []bool{false, true} {
t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var objs []runtime.Object
for _, pod := range unschedulablePods {
objs = append(objs, pod)
}
client := fake.NewClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(client, 0)
recorder := metrics.NewMetricsAsyncRecorder(3, 20*time.Microsecond, ctx.Done())
queue := internalqueue.NewPriorityQueue(
newDefaultQueueSort(),
informerFactory,
internalqueue.WithMetricsRecorder(*recorder),
internalqueue.WithQueueingHintMapPerProfile(queueingHintMap),
// disable backoff queue
internalqueue.WithPodInitialBackoffDuration(0),
internalqueue.WithPodMaxBackoffDuration(0))
schedulerCache := internalcache.New(ctx, 30*time.Second)
// Put test pods into unschedulable queue
for _, pod := range unschedulablePods {
queue.Add(logger, pod)
poppedPod, err := queue.Pop(logger)
if err != nil {
t.Fatalf("Pop failed: %v", err)
}
poppedPod.UnschedulablePlugins = sets.New("fooPlugin1")
if err := queue.AddUnschedulableIfNotPresent(logger, poppedPod, queue.SchedulingCycle()); err != nil {
t.Errorf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
}
s, _, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
if err != nil {
t.Fatalf("Failed to initialize test scheduler: %v", err)
}
if len(s.SchedulingQueue.PodsInActiveQ()) > 0 {
t.Errorf("No pods were expected to be in the activeQ before the update, but there were %v", s.SchedulingQueue.PodsInActiveQ())
}
tt.updateFunc(s)
if len(s.SchedulingQueue.PodsInActiveQ()) != len(tt.wantInActive) {
t.Errorf("Different number of pods were expected to be in the activeQ, but found actual %v vs. expected %v", s.SchedulingQueue.PodsInActiveQ(), tt.wantInActive)
}
for _, pod := range s.SchedulingQueue.PodsInActiveQ() {
if !tt.wantInActive.Has(pod.Name) {
t.Errorf("Found unexpected pod in activeQ: %s", pod.Name)
}
}
})
}
}
}
func newDefaultQueueSort() framework.LessFunc {
sort := &queuesort.PrioritySort{}
return sort.Less
}
func TestUpdatePodInCache(t *testing.T) { func TestUpdatePodInCache(t *testing.T) {
ttl := 10 * time.Second ttl := 10 * time.Second
nodeName := "node" nodeName := "node"
@ -81,8 +234,8 @@ func TestUpdatePodInCache(t *testing.T) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
sched := &Scheduler{ sched := &Scheduler{
Cache: cache.New(ctx, ttl), Cache: internalcache.New(ctx, ttl),
SchedulingQueue: queue.NewTestQueue(ctx, nil), SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
logger: logger, logger: logger,
} }
sched.addPodToCache(tt.oldObj) sched.addPodToCache(tt.oldObj)
@ -354,7 +507,7 @@ func TestAddAllEventHandlers(t *testing.T) {
defer cancel() defer cancel()
informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(), 0) informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(), 0)
schedulingQueue := queue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
testSched := Scheduler{ testSched := Scheduler{
StopEverything: ctx.Done(), StopEverything: ctx.Done(),
SchedulingQueue: schedulingQueue, SchedulingQueue: schedulingQueue,

View File

@ -143,7 +143,7 @@ func (pl *NodePorts) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Po
} }
// If the deleted pod is unscheduled, it doesn't make the target pod schedulable. // If the deleted pod is unscheduled, it doesn't make the target pod schedulable.
if deletedPod.Spec.NodeName == "" { if deletedPod.Spec.NodeName == "" && deletedPod.Status.NominatedNodeName == "" {
logger.V(4).Info("the deleted pod is unscheduled and it doesn't make the target pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(deletedPod)) logger.V(4).Info("the deleted pod is unscheduled and it doesn't make the target pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(deletedPod))
return framework.QueueSkip, nil return framework.QueueSkip, nil
} }

View File

@ -294,7 +294,7 @@ func (f *Fit) isSchedulableAfterPodEvent(logger klog.Logger, pod *v1.Pod, oldObj
} }
if modifiedPod == nil { if modifiedPod == nil {
if originalPod.Spec.NodeName == "" { if originalPod.Spec.NodeName == "" && originalPod.Status.NominatedNodeName == "" {
logger.V(5).Info("the deleted pod was unscheduled and it wouldn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod)) logger.V(5).Info("the deleted pod was unscheduled and it wouldn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
return framework.QueueSkip, nil return framework.QueueSkip, nil
} }

View File

@ -104,7 +104,7 @@ func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Po
return framework.QueueSkip, nil return framework.QueueSkip, nil
} }
if deletedPod.Spec.NodeName == "" { if deletedPod.Spec.NodeName == "" && deletedPod.Status.NominatedNodeName == "" {
return framework.QueueSkip, nil return framework.QueueSkip, nil
} }

View File

@ -1011,7 +1011,7 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
nodeInfoToUse := info nodeInfoToUse := info
if i == 0 { if i == 0 {
var err error var err error
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info) podsAdded, stateToUse, nodeInfoToUse, err = addGENominatedPods(ctx, f, pod, state, info)
if err != nil { if err != nil {
return framework.AsStatus(err) return framework.AsStatus(err)
} }
@ -1028,10 +1028,10 @@ func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, s
return status return status
} }
// addNominatedPods adds pods with equal or greater priority which are nominated // addGENominatedPods adds pods with equal or greater priority which are nominated
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
// 3) augmented nodeInfo. // 3) augmented nodeInfo.
func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { func addGENominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
if fh == nil { if fh == nil {
// This may happen only in tests. // This may happen only in tests.
return false, state, nodeInfo, nil return false, state, nodeInfo, nil

View File

@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache" "k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/utils/clock"
) )
const ( const (
@ -116,6 +117,7 @@ func (sched *Scheduler) applyDefaultHandlers() {
} }
type schedulerOptions struct { type schedulerOptions struct {
clock clock.Clock
componentConfigVersion string componentConfigVersion string
kubeConfig *restclient.Config kubeConfig *restclient.Config
// Overridden by profile level percentageOfNodesToScore if set in v1. // Overridden by profile level percentageOfNodesToScore if set in v1.
@ -227,6 +229,13 @@ func WithExtenders(e ...schedulerapi.Extender) Option {
} }
} }
// WithClock sets clock for PriorityQueue, the default clock is clock.RealClock.
func WithClock(clock clock.Clock) Option {
return func(o *schedulerOptions) {
o.clock = clock
}
}
// FrameworkCapturer is used for registering a notify function in building framework. // FrameworkCapturer is used for registering a notify function in building framework.
type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile) type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
@ -238,6 +247,7 @@ func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
} }
var defaultSchedulerOptions = schedulerOptions{ var defaultSchedulerOptions = schedulerOptions{
clock: clock.RealClock{},
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
@ -343,6 +353,7 @@ func New(ctx context.Context,
podQueue := internalqueue.NewSchedulingQueue( podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(), profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory, informerFactory,
internalqueue.WithClock(options.clock),
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodLister(podLister), internalqueue.WithPodLister(podLister),

View File

@ -18,13 +18,19 @@ package eventhandler
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-helpers/scheduling/corev1" "k8s.io/component-helpers/scheduling/corev1"
configv1 "k8s.io/kube-scheduler/config/v1" configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
@ -32,9 +38,12 @@ import (
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
schedulerutils "k8s.io/kubernetes/test/integration/scheduler" schedulerutils "k8s.io/kubernetes/test/integration/scheduler"
testutils "k8s.io/kubernetes/test/integration/util" testutils "k8s.io/kubernetes/test/integration/util"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
) )
var lowPriority, mediumPriority, highPriority int32 = 100, 200, 300
var _ framework.FilterPlugin = &fooPlugin{} var _ framework.FilterPlugin = &fooPlugin{}
type fooPlugin struct { type fooPlugin struct {
@ -135,3 +144,136 @@ func TestUpdateNodeEvent(t *testing.T) {
t.Errorf("Pod %v was not scheduled: %v", pod.Name, err) t.Errorf("Pod %v was not scheduled: %v", pod.Name, err)
} }
} }
func TestUpdateNominatedNodeName(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
testBackoff := time.Minute
testContext := testutils.InitTestAPIServer(t, "test-event", nil)
capacity := map[v1.ResourceName]string{
v1.ResourceMemory: "32",
}
var cleanupPods []*v1.Pod
testNode := st.MakeNode().Name("node-0").Label("kubernetes.io/hostname", "node-0").Capacity(capacity).Obj()
// Note that the low priority pod that cannot fit with the mid priority, but can fit with the high priority one.
podLow := testutils.InitPausePod(&testutils.PausePodConfig{
Name: "test-lp-pod",
Namespace: testContext.NS.Name,
Priority: &lowPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceMemory: *resource.NewQuantity(20, resource.DecimalSI)},
}})
cleanupPods = append(cleanupPods, podLow)
podMidNominated := testutils.InitPausePod(&testutils.PausePodConfig{
Name: "test-nominated-pod",
Namespace: testContext.NS.Name,
Priority: &mediumPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceMemory: *resource.NewQuantity(25, resource.DecimalSI)},
}})
cleanupPods = append(cleanupPods, podMidNominated)
podHigh := testutils.InitPausePod(&testutils.PausePodConfig{
Name: "test-hp-pod",
Namespace: testContext.NS.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceMemory: *resource.NewQuantity(10, resource.DecimalSI)},
}})
cleanupPods = append(cleanupPods, podHigh)
tests := []struct {
name string
updateFunc func(testCtx *testutils.TestContext)
}{
{
name: "Preempt nominated pod",
updateFunc: func(testCtx *testutils.TestContext) {
// Create high-priority pod and wait until it's scheduled (unnominate mid-priority pod)
pod, err := testutils.CreatePausePod(testCtx.ClientSet, podHigh)
if err != nil {
t.Fatalf("Creating pod error: %v", err)
}
if err = testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, pod); err != nil {
t.Fatalf("Pod %v was not scheduled: %v", pod.Name, err)
}
},
},
{
name: "Remove nominated pod",
updateFunc: func(testCtx *testutils.TestContext) {
if err := testutils.DeletePod(testCtx.ClientSet, podMidNominated.Name, podMidNominated.Namespace); err != nil {
t.Fatalf("Deleting pod error: %v", err)
}
},
},
}
for _, tt := range tests {
for _, qHintEnabled := range []bool{false, true} {
t.Run(fmt.Sprintf("%s, with queuehint(%v)", tt.name, qHintEnabled), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, qHintEnabled)
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0, true,
scheduler.WithClock(fakeClock),
// UpdateFunc needs to be called when the nominated pod is still in the backoff queue, thus small, but non 0 value.
scheduler.WithPodInitialBackoffSeconds(int64(testBackoff.Seconds())),
scheduler.WithPodMaxBackoffSeconds(int64(testBackoff.Seconds())),
)
defer teardown()
_, err := testutils.CreateNode(testCtx.ClientSet, testNode)
if err != nil {
t.Fatalf("Creating node error: %v", err)
}
// Create initial low-priority pod and wait until it's scheduled.
pod, err := testutils.CreatePausePod(testCtx.ClientSet, podLow)
if err != nil {
t.Fatalf("Creating pod error: %v", err)
}
if err := testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, pod); err != nil {
t.Fatalf("Pod %v was not scheduled: %v", pod.Name, err)
}
// Create mid-priority pod and wait until it becomes nominated (preempt low-priority pod) and remain uschedulable.
pod, err = testutils.CreatePausePod(testCtx.ClientSet, podMidNominated)
if err != nil {
t.Fatalf("Creating pod error: %v", err)
}
if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, testCtx.ClientSet, pod); err != nil {
t.Errorf("NominatedNodeName field was not set for pod %v: %v", pod.Name, err)
}
if err := testutils.WaitForPodUnschedulable(testCtx.Ctx, testCtx.ClientSet, pod); err != nil {
t.Errorf("Pod %v haven't become unschedulabe: %v", pod.Name, err)
}
// Remove the initial low-priority pod, which will move the nominated unschedulable pod back to the backoff queue.
if err := testutils.DeletePod(testCtx.ClientSet, podLow.Name, podLow.Namespace); err != nil {
t.Fatalf("Deleting pod error: %v", err)
}
// Create another low-priority pods which cannot be scheduled because the mid-priority pod is nominated on the node and the node doesn't have enough resource to have two pods both.
pod, err = testutils.CreatePausePod(testCtx.ClientSet, podLow)
if err != nil {
t.Fatalf("Creating pod error: %v", err)
}
if err := testutils.WaitForPodUnschedulable(testCtx.Ctx, testCtx.ClientSet, pod); err != nil {
t.Fatalf("Pod %v was not scheduled: %v", pod.Name, err)
}
// Update causing the nominated pod to be removed or to get its nominated node name removed, which should trigger scheduling of the low priority pod.
// Note that the update has to happen since the nominated pod is still in the backoffQ to actually test updates of nominated, but not bound yet pods.
tt.updateFunc(testCtx)
// Advance time by the maxPodBackoffSeconds to move low priority pod out of the backoff queue.
fakeClock.Step(testBackoff)
// Expect the low-priority pod is notified about unnominated mid-pririty pod and gets scheduled, as it should fit this time.
if err := testutils.WaitForPodToSchedule(testCtx.Ctx, testCtx.ClientSet, podLow); err != nil {
t.Fatalf("Pod %v was not scheduled: %v", podLow.Name, err)
}
testutils.CleanupPods(testCtx.Ctx, testCtx.ClientSet, t, cleanupPods)
})
}
}
}

View File

@ -84,26 +84,6 @@ const filterPluginName = "filter-plugin"
var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300) var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300)
func waitForNominatedNodeNameWithTimeout(ctx context.Context, cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, timeout, false, func(ctx context.Context) (bool, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if len(pod.Status.NominatedNodeName) > 0 {
return true, nil
}
return false, err
}); err != nil {
return fmt.Errorf(".status.nominatedNodeName of Pod %v/%v did not get set: %v", pod.Namespace, pod.Name, err)
}
return nil
}
func waitForNominatedNodeName(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error {
return waitForNominatedNodeNameWithTimeout(ctx, cs, pod, wait.ForeverTestTimeout)
}
const tokenFilterName = "token-filter" const tokenFilterName = "token-filter"
// tokenFilter is a fake plugin that implements PreFilter and Filter. // tokenFilter is a fake plugin that implements PreFilter and Filter.
@ -504,7 +484,7 @@ func TestPreemption(t *testing.T) {
} }
// Also check that the preemptor pod gets the NominatedNodeName field set. // Also check that the preemptor pod gets the NominatedNodeName field set.
if len(test.preemptedPodIndexes) > 0 { if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil {
t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err)
} }
} }
@ -1086,7 +1066,7 @@ func TestNonPreemption(t *testing.T) {
t.Fatalf("Error while creating preemptor: %v", err) t.Fatalf("Error while creating preemptor: %v", err)
} }
err = waitForNominatedNodeNameWithTimeout(testCtx.Ctx, cs, preemptorPod, 5*time.Second) err = testutils.WaitForNominatedNodeNameWithTimeout(testCtx.Ctx, cs, preemptorPod, 5*time.Second)
// test.PreemptionPolicy == nil means we expect the preemptor to be nominated. // test.PreemptionPolicy == nil means we expect the preemptor to be nominated.
expect := test.PreemptionPolicy == nil expect := test.PreemptionPolicy == nil
// err == nil indicates the preemptor is indeed nominated. // err == nil indicates the preemptor is indeed nominated.
@ -1168,7 +1148,7 @@ func TestDisablePreemption(t *testing.T) {
} }
// Ensure preemptor should not be nominated. // Ensure preemptor should not be nominated.
if err := waitForNominatedNodeNameWithTimeout(testCtx.Ctx, cs, preemptor, 5*time.Second); err == nil { if err := testutils.WaitForNominatedNodeNameWithTimeout(testCtx.Ctx, cs, preemptor, 5*time.Second); err == nil {
t.Errorf("Preemptor %v should not be nominated", preemptor.Name) t.Errorf("Preemptor %v should not be nominated", preemptor.Name)
} }
@ -1381,7 +1361,7 @@ func TestPreemptionStarvation(t *testing.T) {
t.Errorf("Error while creating the preempting pod: %v", err) t.Errorf("Error while creating the preempting pod: %v", err)
} }
// Check if .status.nominatedNodeName of the preemptor pod gets set. // Check if .status.nominatedNodeName of the preemptor pod gets set.
if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil {
t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
} }
// Make sure that preemptor is scheduled after preemptions. // Make sure that preemptor is scheduled after preemptions.
@ -1481,7 +1461,7 @@ func TestPreemptionRaces(t *testing.T) {
} }
} }
// Check that the preemptor pod gets nominated node name. // Check that the preemptor pod gets nominated node name.
if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil {
t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
} }
// Make sure that preemptor is scheduled after preemptions. // Make sure that preemptor is scheduled after preemptions.
@ -1577,8 +1557,8 @@ func TestNominatedNodeCleanUp(t *testing.T) {
}, },
postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{
testutils.WaitForPodToSchedule, testutils.WaitForPodToSchedule,
waitForNominatedNodeName, testutils.WaitForNominatedNodeName,
waitForNominatedNodeName, testutils.WaitForNominatedNodeName,
}, },
}, },
{ {
@ -1597,7 +1577,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
}, },
postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{
testutils.WaitForPodToSchedule, testutils.WaitForPodToSchedule,
waitForNominatedNodeName, testutils.WaitForNominatedNodeName,
testutils.WaitForPodToSchedule, testutils.WaitForPodToSchedule,
}, },
podNamesToDelete: []string{"low"}, podNamesToDelete: []string{"low"},
@ -1615,7 +1595,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
}, },
postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{
testutils.WaitForPodToSchedule, testutils.WaitForPodToSchedule,
waitForNominatedNodeName, testutils.WaitForNominatedNodeName,
}, },
// Delete the node to simulate an ErrNoNodesAvailable error. // Delete the node to simulate an ErrNoNodesAvailable error.
deleteNode: true, deleteNode: true,
@ -1634,7 +1614,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
}, },
postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{
testutils.WaitForPodToSchedule, testutils.WaitForPodToSchedule,
waitForNominatedNodeName, testutils.WaitForNominatedNodeName,
}, },
podNamesToDelete: []string{fmt.Sprintf("low-%v", doNotFailMe)}, podNamesToDelete: []string{fmt.Sprintf("low-%v", doNotFailMe)},
customPlugins: &configv1.Plugins{ customPlugins: &configv1.Plugins{
@ -1990,7 +1970,7 @@ func TestPDBInPreemption(t *testing.T) {
} }
// Also check if .status.nominatedNodeName of the preemptor pod gets set. // Also check if .status.nominatedNodeName of the preemptor pod gets set.
if len(test.preemptedPodIndexes) > 0 { if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil {
t.Errorf("Test [%v]: .status.nominatedNodeName was not set for pod %v/%v: %v", test.name, preemptor.Namespace, preemptor.Name, err) t.Errorf("Test [%v]: .status.nominatedNodeName was not set for pod %v/%v: %v", test.name, preemptor.Namespace, preemptor.Name, err)
} }
} }
@ -2483,7 +2463,7 @@ func TestReadWriteOncePodPreemption(t *testing.T) {
} }
// Also check that the preemptor pod gets the NominatedNodeName field set. // Also check that the preemptor pod gets the NominatedNodeName field set.
if len(test.preemptedPodIndexes) > 0 { if len(test.preemptedPodIndexes) > 0 {
if err := waitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil { if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, preemptor); err != nil {
t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err)
} }
} }

View File

@ -1160,3 +1160,23 @@ func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.Queued
} }
return podInfo return podInfo
} }
func WaitForNominatedNodeNameWithTimeout(ctx context.Context, cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, timeout, false, func(ctx context.Context) (bool, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if len(pod.Status.NominatedNodeName) > 0 {
return true, nil
}
return false, err
}); err != nil {
return fmt.Errorf(".status.nominatedNodeName of Pod %v/%v did not get set: %w", pod.Namespace, pod.Name, err)
}
return nil
}
func WaitForNominatedNodeName(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error {
return WaitForNominatedNodeNameWithTimeout(ctx, cs, pod, wait.ForeverTestTimeout)
}