diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 5e555a779c4..bc5ad6754da 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -29,8 +29,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" @@ -58,11 +56,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/profile" ) -const ( - initialGetBackoff = 100 * time.Millisecond - maximalGetBackoff = time.Minute -) - // Binder knows how to write a binding. type Binder interface { Bind(binding *v1.Binding) error @@ -206,7 +199,7 @@ func (c *Configurator) create() (*Scheduler, error) { Algorithm: algo, Profiles: profiles, NextPod: internalqueue.MakeNextPodFunc(podQueue), - Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache), + Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), StopEverything: c.StopEverything, SchedulingQueue: podQueue, }, nil @@ -476,7 +469,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core } // MakeDefaultErrorFunc construct a function to handle pod scheduler error -func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) { +func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) { return func(podInfo *framework.QueuedPodInfo, err error) { pod := podInfo.Pod if err == core.ErrNoNodesAvailable { @@ -501,40 +494,17 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch klog.Errorf("Error scheduling %v/%v: %v; retrying", pod.Namespace, pod.Name, err) } - podSchedulingCycle := podQueue.SchedulingCycle() - // Retry asynchronously. - // Note that this is extremely rudimentary and we need a more real error handling path. - go func() { - defer utilruntime.HandleCrash() - podID := types.NamespacedName{ - Namespace: pod.Namespace, - Name: pod.Name, - } - - // Get the pod again; it may have changed/been scheduled already. - getBackoff := initialGetBackoff - for { - pod, err := client.CoreV1().Pods(podID.Namespace).Get(context.TODO(), podID.Name, metav1.GetOptions{}) - if err == nil { - if len(pod.Spec.NodeName) == 0 { - podInfo.Pod = pod - if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podSchedulingCycle); err != nil { - klog.Error(err) - } - } - break - } - if apierrors.IsNotFound(err) { - klog.Warningf("A pod %v no longer exists", podID) - return - } - klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err) - if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff { - getBackoff = maximalGetBackoff - } - time.Sleep(getBackoff) - } - }() + // Check if the Pod exists in informer cache. + cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name) + if err != nil { + klog.Warningf("Pod %v/%v doesn't exist in informer cache: %v", pod.Namespace, pod.Name, err) + return + } + // As is from SharedInformer, we need to do a DeepCopy() here. + podInfo.Pod = cachedPod.DeepCopy() + if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil { + klog.Error(err) + } } } diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index e475c07e5a2..ca20ed125a2 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -19,7 +19,6 @@ package scheduler import ( "context" "errors" - "reflect" "testing" "time" @@ -29,10 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" - clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" extenderv1 "k8s.io/kube-scheduler/extender/v1" @@ -284,134 +283,152 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) { } func TestDefaultErrorFunc(t *testing.T) { - grace := int64(30) - testPod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyAlways, - DNSPolicy: v1.DNSClusterFirst, - TerminationGracePeriodSeconds: &grace, - SecurityContext: &v1.PodSecurityContext{}, + testPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}} + testPodUpdated := testPod.DeepCopy() + testPodUpdated.Labels = map[string]string{"foo": ""} + + tests := []struct { + name string + injectErr error + podUpdatedDuringScheduling bool // pod is updated during a scheduling cycle + podDeletedDuringScheduling bool // pod is deleted during a scheduling cycle + expect *v1.Pod + }{ + { + name: "pod is updated during a scheduling cycle", + injectErr: nil, + podUpdatedDuringScheduling: true, + expect: testPodUpdated, + }, + { + name: "pod is not updated during a scheduling cycle", + injectErr: nil, + expect: testPod, + }, + { + name: "pod is deleted during a scheduling cycle", + injectErr: nil, + podDeletedDuringScheduling: true, + expect: nil, }, } - nodeBar, nodeFoo := - &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) - testPodInfo := &framework.QueuedPodInfo{Pod: testPod} - client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{*nodeBar}}) - stopCh := make(chan struct{}) - defer close(stopCh) + client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}) + informerFactory := informers.NewSharedInformerFactory(client, 0) + podInformer := informerFactory.Core().V1().Pods() + // Need to add/update/delete testPod to the store. + podInformer.Informer().GetStore().Add(testPod) - timestamp := time.Now() - queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(timestamp))) - schedulerCache := internalcache.New(30*time.Second, stopCh) - errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache) + queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) + schedulerCache := internalcache.New(30*time.Second, stopCh) - _ = schedulerCache.AddNode(nodeFoo) + queue.Add(testPod) + queue.Pop() - // assume nodeFoo was not found - err := apierrors.NewNotFound(apicore.Resource("node"), nodeFoo.Name) - errFunc(testPodInfo, err) - dump := schedulerCache.Dump() - for _, n := range dump.Nodes { - if e, a := nodeFoo, n.Node(); reflect.DeepEqual(e, a) { - t.Errorf("Node %s is still in schedulerCache", e.Name) - break - } - } + if tt.podUpdatedDuringScheduling { + podInformer.Informer().GetStore().Update(testPodUpdated) + queue.Update(testPod, testPodUpdated) + } + if tt.podDeletedDuringScheduling { + podInformer.Informer().GetStore().Delete(testPod) + queue.Delete(testPod) + } - // Try up to a minute to retrieve the error pod from priority queue - foundPodFlag := false - maxIterations := 10 * 60 - for i := 0; i < maxIterations; i++ { - time.Sleep(100 * time.Millisecond) - got := getPodfromPriorityQueue(queue, testPod) - if got == nil { - continue - } + testPodInfo := &framework.QueuedPodInfo{Pod: testPod} + errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) + errFunc(testPodInfo, tt.injectErr) - testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name) + var got *v1.Pod + if tt.podUpdatedDuringScheduling { + head, e := queue.Pop() + if e != nil { + t.Fatalf("Cannot pop pod from the activeQ: %v", e) + } + got = head.Pod + } else { + got = getPodFromPriorityQueue(queue, testPod) + } - if e, a := testPod, got; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - - foundPodFlag = true - break - } - - if !foundPodFlag { - t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod) - } - - _ = queue.Delete(testPod) - - // Trigger error handling again to put the pod in unschedulable queue - errFunc(testPodInfo, nil) - - // Try up to a minute to retrieve the error pod from priority queue - foundPodFlag = false - maxIterations = 10 * 60 - for i := 0; i < maxIterations; i++ { - time.Sleep(100 * time.Millisecond) - got := getPodfromPriorityQueue(queue, testPod) - if got == nil { - continue - } - - testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name) - - if e, a := testPod, got; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - - foundPodFlag = true - break - } - - if !foundPodFlag { - t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod) - } - - // Remove the pod from priority queue to test putting error - // pod in backoff queue. - queue.Delete(testPod) - - // Trigger a move request - queue.MoveAllToActiveOrBackoffQueue("test") - - // Trigger error handling again to put the pod in backoff queue - errFunc(testPodInfo, nil) - - foundPodFlag = false - for i := 0; i < maxIterations; i++ { - time.Sleep(100 * time.Millisecond) - // The pod should be found from backoff queue at this time - got := getPodfromPriorityQueue(queue, testPod) - if got == nil { - continue - } - - testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name) - - if e, a := testPod, got; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) - } - - foundPodFlag = true - break - } - - if !foundPodFlag { - t.Errorf("Failed to get pod from the backoff queue after waiting for a minute: %v", testPod) + if diff := cmp.Diff(tt.expect, got); diff != "" { + t.Errorf("Unexpected pod (-want, +got): %s", diff) + } + }) } } -// getPodfromPriorityQueue is the function used in the TestDefaultErrorFunc test to get +func TestDefaultErrorFunc_NodeNotFound(t *testing.T) { + nodeFoo := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} + nodeBar := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} + testPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}} + tests := []struct { + name string + nodes []v1.Node + nodeNameToDelete string + injectErr error + expectNodeNames sets.String + }{ + { + name: "node is deleted during a scheduling cycle", + nodes: []v1.Node{*nodeFoo, *nodeBar}, + nodeNameToDelete: "foo", + injectErr: apierrors.NewNotFound(apicore.Resource("node"), nodeFoo.Name), + expectNodeNames: sets.NewString("bar"), + }, + { + name: "node is not deleted but NodeNotFound is received incorrectly", + nodes: []v1.Node{*nodeFoo, *nodeBar}, + injectErr: apierrors.NewNotFound(apicore.Resource("node"), nodeFoo.Name), + expectNodeNames: sets.NewString("foo", "bar"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + + client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: tt.nodes}) + informerFactory := informers.NewSharedInformerFactory(client, 0) + podInformer := informerFactory.Core().V1().Pods() + // Need to add testPod to the store. + podInformer.Informer().GetStore().Add(testPod) + + queue := internalqueue.NewPriorityQueue(nil, internalqueue.WithClock(clock.NewFakeClock(time.Now()))) + schedulerCache := internalcache.New(30*time.Second, stopCh) + + for i := range tt.nodes { + node := tt.nodes[i] + // Add node to schedulerCache no matter it's deleted in API server or not. + schedulerCache.AddNode(&node) + if node.Name == tt.nodeNameToDelete { + client.CoreV1().Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{}) + } + } + + testPodInfo := &framework.QueuedPodInfo{Pod: testPod} + errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache) + errFunc(testPodInfo, tt.injectErr) + + gotNodes := schedulerCache.Dump().Nodes + gotNodeNames := sets.NewString() + for _, nodeInfo := range gotNodes { + gotNodeNames.Insert(nodeInfo.Node().Name) + } + if diff := cmp.Diff(tt.expectNodeNames, gotNodeNames); diff != "" { + t.Errorf("Unexpected nodes (-want, +got): %s", diff) + } + }) + } +} + +// getPodFromPriorityQueue is the function used in the TestDefaultErrorFunc test to get // the specific pod from the given priority queue. It returns the found pod in the priority queue. -func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod { +func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod { podList := queue.PendingPods() if len(podList) == 0 { return nil @@ -436,33 +453,6 @@ func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v return nil } -// testClientGetPodRequest function provides a routine used by TestDefaultErrorFunc test. -// It tests whether the fake client can receive request and correctly "get" the namespace -// and name of the error pod. -func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, podName string) { - requestReceived := false - actions := client.Actions() - for _, a := range actions { - if a.GetVerb() == "get" && a.GetResource().Resource == "pods" { - getAction, ok := a.(clienttesting.GetAction) - if !ok { - t.Errorf("Can't cast action object to GetAction interface") - break - } - name := getAction.GetName() - ns := a.GetNamespace() - if name != podName || ns != podNs { - t.Errorf("Expected name %s namespace %s, got %s %s", - podName, podNs, name, ns) - } - requestReceived = true - } - } - if !requestReceived { - t.Errorf("Get pod request not received") - } -} - func newConfigFactoryWithFrameworkRegistry( client clientset.Interface, stopCh <-chan struct{}, registry framework.Registry) *Configurator { diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 4879f4f58ad..3b3b2d28944 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -563,7 +563,7 @@ func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod { func (p *PriorityQueue) PendingPods() []*v1.Pod { p.lock.RLock() defer p.lock.RUnlock() - result := []*v1.Pod{} + var result []*v1.Pod for _, pInfo := range p.activeQ.List() { result = append(result, pInfo.(*framework.QueuedPodInfo).Pod) } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index dffac82fe3e..84df97e101e 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -1394,7 +1394,7 @@ func TestFilterPlugin(t *testing.T) { } // Create the master and the scheduler with the test plugin set. - testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "filter-plugin", nil), 2, + testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "filter-plugin", nil), 1, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) @@ -1418,8 +1418,8 @@ func TestFilterPlugin(t *testing.T) { } } - if filterPlugin.numFilterCalled == 0 { - t.Errorf("Expected the filter plugin to be called.") + if filterPlugin.numFilterCalled != 1 { + t.Errorf("Expected the filter plugin to be called 1 time, but got %v.", filterPlugin.numFilterCalled) } filterPlugin.reset()