diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 089a3469be1..6bc62fa718a 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -136,7 +136,7 @@ func (c *Configurator) create() (*Scheduler, error) { } // The nominator will be passed all the way to framework instantiation. - nominator := internalqueue.NewPodNominator() + nominator := internalqueue.NewSafePodNominator(c.informerFactory.Core().V1().Pods().Lister()) profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory, frameworkruntime.WithClientSet(c.client), frameworkruntime.WithKubeConfig(c.kubeConfig), diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index a7e4cc4bc22..956de18b84c 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -767,6 +767,8 @@ func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *Unschedulab // may be different than what scheduler has here. We should be able to find pods // by their UID and update/delete them. type nominatedPodMap struct { + // podLister is used to verify if the given pod is alive. + podLister listersv1.PodLister // nominatedPods is a map keyed by a node name and the value is a list of // pods which are nominated to run on the node. These are pods which can be in // the activeQ or unschedulableQ. @@ -790,6 +792,15 @@ func (npm *nominatedPodMap) add(pi *framework.PodInfo, nodeName string) { return } } + + if npm.podLister != nil { + // If the pod is not alive, don't contain it. + if _, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name); err != nil { + klog.V(4).InfoS("Pod doesn't exist in podLister, aborting adding it to the nominated map", "pod", klog.KObj(pi.Pod)) + return + } + } + npm.nominatedPodToNode[pi.Pod.UID] = nnn for _, npi := range npm.nominatedPods[nnn] { if npi.Pod.UID == pi.Pod.UID { @@ -842,8 +853,17 @@ func (npm *nominatedPodMap) UpdateNominatedPod(oldPod *v1.Pod, newPodInfo *frame } // NewPodNominator creates a nominatedPodMap as a backing of framework.PodNominator. +// DEPRECATED: use NewSafePodNominator() instead. func NewPodNominator() framework.PodNominator { + return NewSafePodNominator(nil) +} + +// NewSafePodNominator creates a nominatedPodMap as a backing of framework.PodNominator. +// Unlike NewPodNominator, it passes in a podLister so as to check if the pod is alive +// before adding its nominatedNode info. +func NewSafePodNominator(podLister listersv1.PodLister) framework.PodNominator { return &nominatedPodMap{ + podLister: podLister, nominatedPods: make(map[string][]*framework.PodInfo), nominatedPodToNode: make(map[types.UID]string), } @@ -870,7 +890,6 @@ func podInfoKeyFunc(obj interface{}) (string, error) { // Checks if the Pod may become schedulable upon the event. // This is achieved by looking up the global clusterEventMap registry. func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, clusterEvent framework.ClusterEvent) bool { - if clusterEvent.IsWildCard() { return true } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index dd98a45f9a4..73d54c03d7f 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -26,12 +26,15 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/metrics/testutil" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -145,8 +148,8 @@ func TestPriorityQueue_Add(t *testing.T) { "node1": {medPriorityPodInfo, unschedulablePodInfo}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != highPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPodInfo.Pod.Name, p.Pod.Name) @@ -197,8 +200,8 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) { "node1": {highPriNominatedPodInfo, unschedulablePodInfo}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != highPriNominatedPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPodInfo.Pod.Name, p.Pod.Name) @@ -643,6 +646,57 @@ func TestPriorityQueue_NominatedPodsForNode(t *testing.T) { } } +func TestPriorityQueue_NominatedPodDeleted(t *testing.T) { + tests := []struct { + name string + podInfo *framework.PodInfo + deletePod bool + want bool + }{ + { + name: "alive pod gets added into PodNominator", + podInfo: medPriorityPodInfo, + want: true, + }, + { + name: "deleted pod shouldn't be added into PodNominator", + podInfo: highPriNominatedPodInfo, + deletePod: true, + want: false, + }, + { + name: "pod without .status.nominatedPodName specified shouldn't be added into PodNominator", + podInfo: highPriorityPodInfo, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cs := fake.NewSimpleClientset(tt.podInfo.Pod) + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podLister := informerFactory.Core().V1().Pods().Lister() + + // Build a PriorityQueue. + q := NewPriorityQueue(newDefaultQueueSort(), informerFactory, WithPodNominator(NewSafePodNominator(podLister))) + ctx := context.Background() + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + if tt.deletePod { + // Simulate that the test pod gets deleted physically. + informerFactory.Core().V1().Pods().Informer().GetStore().Delete(tt.podInfo.Pod) + } + + q.AddNominatedPod(tt.podInfo, tt.podInfo.Pod.Status.NominatedNodeName) + + if got := len(q.NominatedPodsForNode(tt.podInfo.Pod.Status.NominatedNodeName)) == 1; got != tt.want { + t.Errorf("Want %v, but got %v", tt.want, got) + } + }) + } +} + func TestPriorityQueue_PendingPods(t *testing.T) { makeSet := func(pods []*v1.Pod) map[*v1.Pod]struct{} { pendingSet := map[*v1.Pod]struct{}{} @@ -690,15 +744,15 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after adding pods (-want, +got):\n%s", diff) } if p, err := q.Pop(); err != nil || p.Pod != medPriorityPodInfo.Pod { t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPodInfo.Pod.Name, p.Pod.Name) } // List of nominated pods shouldn't change after popping them from the queue. - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after popping pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after popping pods (-want, +got):\n%s", diff) } // Update one of the nominated pods that doesn't have nominatedNodeName in the // pod object. It should be updated correctly. @@ -715,8 +769,8 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after updating pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after updating pods (-want, +got):\n%s", diff) } // Delete a nominated pod that doesn't have nominatedNodeName in the pod @@ -732,8 +786,8 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) { "node5": {unschedulablePodInfo}, }, } - if !reflect.DeepEqual(q.PodNominator, expectedNominatedPods) { - t.Errorf("Unexpected nominated map after deleting pods. Expected: %v, got: %v", expectedNominatedPods, q.PodNominator) + if diff := cmp.Diff(q.PodNominator, expectedNominatedPods, cmp.AllowUnexported(nominatedPodMap{}), cmpopts.IgnoreFields(nominatedPodMap{}, "RWMutex")); diff != "" { + t.Errorf("Unexpected diff after deleting pods (-want, +got):\n%s", diff) } }