diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index cf3887dd22f..8662faf15c5 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -343,6 +343,14 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL klog.Warningf("Pod %v/%v doesn't exist in informer cache: %v", pod.Namespace, pod.Name, err) return } + + // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler. + // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version. + if len(cachedPod.Spec.NodeName) != 0 { + klog.Warningf("Pod %v/%v has been assigned with %v. Abort adding it back to queue.", pod.Namespace, pod.Name, cachedPod.Spec.NodeName) + return + } + // As is from SharedInformer, we need to do a DeepCopy() here. podInfo.Pod = cachedPod.DeepCopy() if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil { diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 9336c456fa2..2de4217fa3a 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -19,6 +19,7 @@ package scheduler import ( "context" "errors" + "fmt" "testing" "time" @@ -553,6 +554,35 @@ func TestDefaultErrorFunc_NodeNotFound(t *testing.T) { } } +func TestDefaultErrorFunc_PodAlreadyBound(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + + nodeFoo := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + testPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}, Spec: v1.PodSpec{NodeName: "foo"}} + + client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{nodeFoo}}) + informerFactory := informers.NewSharedInformerFactory(client, 0) + podInformer := informerFactory.Core().V1().Pods() + // Need to add testPod to the store. + podInformer.Informer().GetStore().Add(testPod) + + queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) + schedulerCache := internalcache.New(30*time.Second, stopCh) + + // Add node to schedulerCache no matter it's deleted in API server or not. + schedulerCache.AddNode(&nodeFoo) + + testPodInfo := &framework.QueuedPodInfo{Pod: testPod} + errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) + errFunc(testPodInfo, fmt.Errorf("binding rejected: timeout")) + + pod := getPodFromPriorityQueue(queue, testPod) + if pod != nil { + t.Fatalf("Unexpected pod: %v should not be in PriorityQueue when the NodeName of pod is not empty", pod.Name) + } +} + // getPodFromPriorityQueue is the function used in the TestDefaultErrorFunc test to get // the specific pod from the given priority queue. It returns the found pod in the priority queue. func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {