diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index f4de7038339..420a79d4f90 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -235,6 +235,7 @@ func TestAssumePodScheduled(t *testing.T) { type testExpirePodStruct struct { pod *v1.Pod + finishBind bool assumedTime time.Time } @@ -254,6 +255,7 @@ func TestExpirePod(t *testing.T) { testPods := []*v1.Pod{ makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), + makeBasePod(t, nodeName, "test-3", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), } now := time.Now() ttl := 10 * time.Second @@ -264,26 +266,28 @@ func TestExpirePod(t *testing.T) { wNodeInfo *schedulernodeinfo.NodeInfo }{{ // assumed pod would expires pods: []*testExpirePodStruct{ - {pod: testPods[0], assumedTime: now}, + {pod: testPods[0], finishBind: true, assumedTime: now}, }, cleanupTime: now.Add(2 * ttl), wNodeInfo: schedulernodeinfo.NewNodeInfo(), - }, { // first one would expire, second one would not. + }, { // first one would expire, second and third would not. pods: []*testExpirePodStruct{ - {pod: testPods[0], assumedTime: now}, - {pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)}, + {pod: testPods[0], finishBind: true, assumedTime: now}, + {pod: testPods[1], finishBind: true, assumedTime: now.Add(3 * ttl / 2)}, + {pod: testPods[2]}, }, cleanupTime: now.Add(2 * ttl), wNodeInfo: newNodeInfo( &schedulernodeinfo.Resource{ - MilliCPU: 200, - Memory: 1024, + MilliCPU: 400, + Memory: 2048, }, &schedulernodeinfo.Resource{ - MilliCPU: 200, - Memory: 1024, + MilliCPU: 400, + Memory: 2048, }, - []*v1.Pod{testPods[1]}, + // Order gets altered when removing pods. + []*v1.Pod{testPods[2], testPods[1]}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), make(map[string]*schedulernodeinfo.ImageStateSummary), ), @@ -294,11 +298,18 @@ func TestExpirePod(t *testing.T) { cache := newSchedulerCache(ttl, time.Second, nil) for _, pod := range tt.pods { - if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil { - t.Fatalf("assumePod failed: %v", err) + if err := cache.AssumePod(pod.pod); err != nil { + t.Fatal(err) + } + if !pod.finishBind { + continue + } + if err := cache.finishBinding(pod.pod, pod.assumedTime); err != nil { + t.Fatal(err) } } - // pods that have assumedTime + ttl < cleanupTime will get expired and removed + // pods that got bound and have assumedTime + ttl < cleanupTime will get + // expired and removed cache.cleanupAssumedPods(tt.cleanupTime) n := cache.nodes[nodeName] if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d1a05a3cb65..872481ccaa3 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -481,78 +481,6 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { } } -// Scheduler should preserve predicate constraint even if binding was longer -// than cache ttl -func TestSchedulerErrorWithLongBinding(t *testing.T) { - stop := make(chan struct{}) - defer close(stop) - - firstPod := podWithPort("foo", "", 8080) - conflictPod := podWithPort("bar", "", 8080) - pods := map[string]*v1.Pod{firstPod.Name: firstPod, conflictPod.Name: conflictPod} - for _, test := range []struct { - name string - Expected map[string]bool - CacheTTL time.Duration - BindingDuration time.Duration - }{ - { - name: "long cache ttl", - Expected: map[string]bool{firstPod.Name: true}, - CacheTTL: 100 * time.Millisecond, - BindingDuration: 300 * time.Millisecond, - }, - { - name: "short cache ttl", - Expected: map[string]bool{firstPod.Name: true}, - CacheTTL: 10 * time.Second, - BindingDuration: 300 * time.Millisecond, - }, - } { - t.Run(test.name, func(t *testing.T) { - queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := internalcache.New(test.CacheTTL, stop) - - node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} - scache.AddNode(&node) - - client := clientsetfake.NewSimpleClientset(&node) - informerFactory := informers.NewSharedInformerFactory(client, 0) - fns := []st.RegisterPluginFunc{ - st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"), - } - - scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry( - queuedPodStore, scache, informerFactory, stop, test.BindingDuration, fns...) - - informerFactory.Start(stop) - informerFactory.WaitForCacheSync(stop) - - go scheduler.Run(context.Background()) - queuedPodStore.Add(firstPod) - queuedPodStore.Add(conflictPod) - - resultBindings := map[string]bool{} - waitChan := time.After(5 * time.Second) - for finished := false; !finished; { - select { - case b := <-bindingChan: - resultBindings[b.Name] = true - p := pods[b.Name] - p.Spec.NodeName = b.Target.Name - scache.AddPod(p) - case <-waitChan: - finished = true - } - } - if !reflect.DeepEqual(resultBindings, test.Expected) { - t.Errorf("Result binding are not equal to expected. %v != %v", resultBindings, test.Expected) - } - }) - } -} - // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, @@ -727,66 +655,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C return sched, bindingChan, errChan } -func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, stop chan struct{}, bindingTime time.Duration, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding) { - registry := framework.Registry{} - // TODO: instantiate the plugins dynamically. - plugins := &schedulerapi.Plugins{ - QueueSort: &schedulerapi.PluginSet{}, - PreFilter: &schedulerapi.PluginSet{}, - Filter: &schedulerapi.PluginSet{}, - } - var pluginConfigs []schedulerapi.PluginConfig - for _, f := range fns { - f(®istry, plugins, pluginConfigs) - } - fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) - queue := internalqueue.NewSchedulingQueue(nil) - algo := core.NewGenericScheduler( - scache, - queue, - internalcache.NewEmptySnapshot(), - fwk, - []algorithm.SchedulerExtender{}, - nil, - informerFactory.Core().V1().PersistentVolumeClaims().Lister(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), - false, - schedulerapi.DefaultPercentageOfNodesToScore, - false, - ) - bindingChan := make(chan *v1.Binding, 2) - - sched := &Scheduler{ - SchedulerCache: scache, - Algorithm: algo, - GetBinder: func(pod *v1.Pod) Binder { - return fakeBinder{func(b *v1.Binding) error { - time.Sleep(bindingTime) - bindingChan <- b - return nil - }} - }, - scheduledPodsHasSynced: func() bool { - return true - }, - NextPod: func() *framework.PodInfo { - return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)} - }, - Error: func(p *framework.PodInfo, err error) { - queuedPodStore.AddIfNotPresent(p) - }, - Recorder: &events.FakeRecorder{}, - podConditionUpdater: fakePodConditionUpdater{}, - podPreemptor: fakePodPreemptor{}, - StopEverything: stop, - Framework: fwk, - VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), - SchedulingQueue: queue, - } - - return sched, bindingChan -} - func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)