diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index f307cb86699..b6b9a926708 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -90,7 +90,6 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", @@ -102,6 +101,7 @@ go_test( "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", ], ) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 808ddc3f496..d372fb264f6 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -157,7 +157,6 @@ func (c *Configurator) create(extenders []algorithm.SchedulerExtender) (*Schedul return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, - GetBinder: getBinderFunc(c.client, extenders), Framework: framework, NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache), @@ -378,19 +377,6 @@ func getPredicateConfigs(keys sets.String, lr *frameworkplugins.LegacyRegistry, return &plugins, pluginConfig, nil } -// getBinderFunc returns a func which returns an extender that supports bind or a default binder based on the given pod. -func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder { - defaultBinder := &binder{client} - return func(pod *v1.Pod) Binder { - for _, extender := range extenders { - if extender.IsBinder() && extender.IsInterested(pod) { - return extender - } - } - return defaultBinder - } -} - type podInformer struct { informer cache.SharedIndexInformer } @@ -482,19 +468,6 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch } } -type binder struct { - Client clientset.Interface -} - -// Implement Binder interface -var _ Binder = &binder{} - -// Bind just does a POST binding RPC. -func (b *binder) Bind(binding *v1.Binding) error { - klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) - return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) -} - // GetPodDisruptionBudgetLister returns pdb lister from the given informer factory. Returns nil if PodDisruptionBudget feature is disabled. func GetPodDisruptionBudgetLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) { diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index dc2a1c6da23..759e36dbbe4 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "reflect" "testing" "time" @@ -38,7 +37,6 @@ import ( "k8s.io/client-go/tools/cache" apitesting "k8s.io/kubernetes/pkg/api/testing" kubefeatures "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" @@ -373,49 +371,32 @@ func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, } } -func TestBind(t *testing.T) { - table := []struct { - name string - binding *v1.Binding - }{ - { - name: "binding can bind and validate request", - binding: &v1.Binding{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: metav1.NamespaceDefault, - Name: "foo", - }, - Target: v1.ObjectReference{ - Name: "foohost.kubernetes.mydomain.com", - }, - }, +// TODO(#87157): Move to DefaultBinding Plugin tests when it is introduced. +func TestDefaultBinding(t *testing.T) { + binding := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "foo", + }, + Target: v1.ObjectReference{ + Name: "foohost.kubernetes.mydomain.com", }, } - for _, test := range table { - t.Run(test.name, func(t *testing.T) { - testBind(test.binding, t) - }) - } -} - -func testBind(binding *v1.Binding, t *testing.T) { testPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: binding.GetName(), Namespace: metav1.NamespaceDefault}, Spec: apitesting.V1DeepEqualSafePodSpec(), } client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}) - b := binder{client} - - if err := b.Bind(binding); err != nil { + sched := Scheduler{client: client} + if err := sched.defaultBinding(binding); err != nil { t.Errorf("Unexpected error: %v", err) return } - pod := client.CoreV1().Pods(metav1.NamespaceDefault).(*fakeV1.FakePods) - - actualBinding, err := pod.GetBinding(binding.GetName()) + pods := client.CoreV1().Pods(metav1.NamespaceDefault).(*fakeV1.FakePods) + actualBinding, err := pods.GetBinding(binding.GetName()) if err != nil { t.Fatalf("Unexpected error: %v", err) return @@ -461,6 +442,7 @@ type fakeExtender struct { isBinder bool interestedPodName string ignorable bool + gotBind bool } func (f *fakeExtender) Name() string { @@ -496,6 +478,7 @@ func (f *fakeExtender) Prioritize( func (f *fakeExtender) Bind(binding *v1.Binding) error { if f.isBinder { + f.gotBind = true return nil } return errors.New("not a binder") @@ -509,65 +492,6 @@ func (f *fakeExtender) IsInterested(pod *v1.Pod) bool { return pod != nil && pod.Name == f.interestedPodName } -func TestGetBinderFunc(t *testing.T) { - table := []struct { - podName string - extenders []algorithm.SchedulerExtender - expectedBinderType string - name string - }{ - { - name: "the extender is not a binder", - podName: "pod0", - extenders: []algorithm.SchedulerExtender{ - &fakeExtender{isBinder: false, interestedPodName: "pod0"}, - }, - expectedBinderType: "*scheduler.binder", - }, - { - name: "one of the extenders is a binder and interested in pod", - podName: "pod0", - extenders: []algorithm.SchedulerExtender{ - &fakeExtender{isBinder: false, interestedPodName: "pod0"}, - &fakeExtender{isBinder: true, interestedPodName: "pod0"}, - }, - expectedBinderType: "*scheduler.fakeExtender", - }, - { - name: "one of the extenders is a binder, but not interested in pod", - podName: "pod1", - extenders: []algorithm.SchedulerExtender{ - &fakeExtender{isBinder: false, interestedPodName: "pod1"}, - &fakeExtender{isBinder: true, interestedPodName: "pod0"}, - }, - expectedBinderType: "*scheduler.binder", - }, - } - - for _, test := range table { - t.Run(test.name, func(t *testing.T) { - testGetBinderFunc(test.expectedBinderType, test.podName, test.extenders, t) - }) - } -} - -func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm.SchedulerExtender, t *testing.T) { - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - }, - } - - f := &Configurator{} - binderFunc := getBinderFunc(f.client, extenders) - binder := binderFunc(pod) - - binderType := fmt.Sprintf("%s", reflect.TypeOf(binder)) - if binderType != expectedBinderType { - t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType) - } -} - type TestPlugin struct { name string } diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index f4de7038339..420a79d4f90 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -235,6 +235,7 @@ func TestAssumePodScheduled(t *testing.T) { type testExpirePodStruct struct { pod *v1.Pod + finishBind bool assumedTime time.Time } @@ -254,6 +255,7 @@ func TestExpirePod(t *testing.T) { testPods := []*v1.Pod{ makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), + makeBasePod(t, nodeName, "test-3", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), } now := time.Now() ttl := 10 * time.Second @@ -264,26 +266,28 @@ func TestExpirePod(t *testing.T) { wNodeInfo *schedulernodeinfo.NodeInfo }{{ // assumed pod would expires pods: []*testExpirePodStruct{ - {pod: testPods[0], assumedTime: now}, + {pod: testPods[0], finishBind: true, assumedTime: now}, }, cleanupTime: now.Add(2 * ttl), wNodeInfo: schedulernodeinfo.NewNodeInfo(), - }, { // first one would expire, second one would not. + }, { // first one would expire, second and third would not. pods: []*testExpirePodStruct{ - {pod: testPods[0], assumedTime: now}, - {pod: testPods[1], assumedTime: now.Add(3 * ttl / 2)}, + {pod: testPods[0], finishBind: true, assumedTime: now}, + {pod: testPods[1], finishBind: true, assumedTime: now.Add(3 * ttl / 2)}, + {pod: testPods[2]}, }, cleanupTime: now.Add(2 * ttl), wNodeInfo: newNodeInfo( &schedulernodeinfo.Resource{ - MilliCPU: 200, - Memory: 1024, + MilliCPU: 400, + Memory: 2048, }, &schedulernodeinfo.Resource{ - MilliCPU: 200, - Memory: 1024, + MilliCPU: 400, + Memory: 2048, }, - []*v1.Pod{testPods[1]}, + // Order gets altered when removing pods. + []*v1.Pod{testPods[2], testPods[1]}, newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), make(map[string]*schedulernodeinfo.ImageStateSummary), ), @@ -294,11 +298,18 @@ func TestExpirePod(t *testing.T) { cache := newSchedulerCache(ttl, time.Second, nil) for _, pod := range tt.pods { - if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil { - t.Fatalf("assumePod failed: %v", err) + if err := cache.AssumePod(pod.pod); err != nil { + t.Fatal(err) + } + if !pod.finishBind { + continue + } + if err := cache.finishBinding(pod.pod, pod.assumedTime); err != nil { + t.Fatal(err) } } - // pods that have assumedTime + ttl < cleanupTime will get expired and removed + // pods that got bound and have assumedTime + ttl < cleanupTime will get + // expired and removed cache.cleanupAssumedPods(tt.cleanupTime) n := cache.nodes[nodeName] if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 4b8809cccc5..71f158d5e7c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -82,7 +82,6 @@ type Scheduler struct { SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm - GetBinder func(pod *v1.Pod) Binder // PodConditionUpdater is used only in case of scheduling errors. If we succeed // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. @@ -119,6 +118,9 @@ type Scheduler struct { SchedulingQueue internalqueue.SchedulingQueue scheduledPodsHasSynced func() bool + + // TODO(#87157): Remove this when the DefaultBinding Plugin is introduced. + client clientset.Interface } // Cache returns the cache in scheduler for test to check the data in scheduler. @@ -333,6 +335,7 @@ func New(client clientset.Interface, sched.podConditionUpdater = &podConditionUpdaterImpl{client} sched.podPreemptor = &podPreemptorImpl{client} sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced + sched.client = client AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer) return sched, nil @@ -505,28 +508,61 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { return nil } -// bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we -// handle binding metrics internally. -func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode string, state *framework.CycleState) error { - bindingStart := time.Now() - bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode) - var err error - if !bindStatus.IsSuccess() { - if bindStatus.Code() == framework.Skip { - // All bind plugins chose to skip binding of this pod, call original binding function. - // If binding succeeds then PodScheduled condition will be updated in apiserver so that - // it's atomic with setting host. - err = sched.GetBinder(assumed).Bind(&v1.Binding{ - ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID}, - Target: v1.ObjectReference{ - Kind: "Node", - Name: targetNode, - }, - }) - } else { - err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message()) - } +// bind binds a pod to a given node defined in a binding object. +// The precedence for binding is: (1) extenders, (2) plugins and (3) default binding. +// We expect this to run asynchronously, so we handle binding metrics internally. +func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { + start := time.Now() + defer func() { + sched.finishBinding(assumed, targetNode, start, err) + }() + + binding := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: assumed.Namespace, + Name: assumed.Name, + UID: assumed.UID, + }, + Target: v1.ObjectReference{ + Kind: "Node", + Name: targetNode, + }, } + bound, err := sched.extendersBinding(assumed, binding) + if bound { + return err + } + bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode) + if bindStatus.IsSuccess() { + return nil + } + if bindStatus.Code() != framework.Skip { + return fmt.Errorf("bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message()) + } + // All bind plugins chose to skip binding of this pod, call original binding + // function. If binding succeeds then PodScheduled condition will be updated + // in apiserver so that it's atomic with setting host. + return sched.defaultBinding(binding) +} + +// TODO(#87159): Move this to a Plugin. +func (sched *Scheduler) extendersBinding(assumed *v1.Pod, binding *v1.Binding) (bool, error) { + for _, extender := range sched.Algorithm.Extenders() { + if !extender.IsBinder() || !extender.IsInterested(assumed) { + continue + } + return true, extender.Bind(binding) + } + return false, nil +} + +// TODO(#87157): Move this to a Plugin. +func (sched *Scheduler) defaultBinding(binding *v1.Binding) error { + klog.V(3).Infof("Attempting to bind %v/%v to %v", binding.Namespace, binding.Name, binding.Target.Name) + return sched.client.CoreV1().Pods(binding.Namespace).Bind(binding) +} + +func (sched *Scheduler) finishBinding(assumed *v1.Pod, targetNode string, start time.Time, err error) { if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) } @@ -535,13 +571,12 @@ func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode st if err := sched.SchedulerCache.ForgetPod(assumed); err != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", err) } - return err + return } - metrics.BindingLatency.Observe(metrics.SinceInSeconds(bindingStart)) - metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart)) + metrics.BindingLatency.Observe(metrics.SinceInSeconds(start)) + metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(start)) sched.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) - return nil } // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index d1a05a3cb65..1417bd97725 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/api/events/v1beta1" "k8s.io/apimachinery/pkg/api/resource" @@ -34,12 +35,12 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + clienttesting "k8s.io/client-go/testing" clientcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" @@ -66,12 +67,6 @@ var ( emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, nil) ) -type fakeBinder struct { - b func(binding *v1.Binding) error -} - -func (fb fakeBinder) Bind(binding *v1.Binding) error { return fb.b(binding) } - type fakePodConditionUpdater struct{} func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondition) error { @@ -258,6 +253,7 @@ func TestScheduler(t *testing.T) { expectForgetPod: podWithID("foo", testNode.Name), eventReason: "FailedScheduling", }, { + name: "deleting pod", sendPod: deletingPod("foo"), algo: mockScheduler{core.ScheduleResult{}, nil}, eventReason: "FailedScheduling", @@ -292,16 +288,18 @@ func TestScheduler(t *testing.T) { return pod.UID == gotAssumedPod.UID }, } + client := clientsetfake.NewSimpleClientset(item.sendPod) + client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "binding" { + return false, nil, nil + } + gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding) + return true, gotBinding, item.injectBindError + }) s := &Scheduler{ - SchedulerCache: sCache, - Algorithm: item.algo, - GetBinder: func(pod *v1.Pod) Binder { - return fakeBinder{func(b *v1.Binding) error { - gotBinding = b - return item.injectBindError - }} - }, + SchedulerCache: sCache, + Algorithm: item.algo, podConditionUpdater: fakePodConditionUpdater{}, Error: func(p *framework.PodInfo, err error) { gotPod = p.Pod @@ -313,12 +311,13 @@ func TestScheduler(t *testing.T) { Framework: emptyFramework, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), + client: client, } called := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { e, _ := obj.(*v1beta1.Event) - if e, a := item.eventReason, e.Reason; e != a { - t.Errorf("expected %v, got %v", e, a) + if e.Reason != item.eventReason { + t.Errorf("got event %v, want %v", e.Reason, item.eventReason) } close(called) }) @@ -336,8 +335,8 @@ func TestScheduler(t *testing.T) { if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) { t.Errorf("error: wanted %v, got %v", e, a) } - if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) { - t.Errorf("error: %s", diff.ObjectDiff(e, a)) + if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" { + t.Errorf("got binding diff (-want, +got): %s", diff) } stopFunc() }) @@ -481,78 +480,6 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { } } -// Scheduler should preserve predicate constraint even if binding was longer -// than cache ttl -func TestSchedulerErrorWithLongBinding(t *testing.T) { - stop := make(chan struct{}) - defer close(stop) - - firstPod := podWithPort("foo", "", 8080) - conflictPod := podWithPort("bar", "", 8080) - pods := map[string]*v1.Pod{firstPod.Name: firstPod, conflictPod.Name: conflictPod} - for _, test := range []struct { - name string - Expected map[string]bool - CacheTTL time.Duration - BindingDuration time.Duration - }{ - { - name: "long cache ttl", - Expected: map[string]bool{firstPod.Name: true}, - CacheTTL: 100 * time.Millisecond, - BindingDuration: 300 * time.Millisecond, - }, - { - name: "short cache ttl", - Expected: map[string]bool{firstPod.Name: true}, - CacheTTL: 10 * time.Second, - BindingDuration: 300 * time.Millisecond, - }, - } { - t.Run(test.name, func(t *testing.T) { - queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - scache := internalcache.New(test.CacheTTL, stop) - - node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} - scache.AddNode(&node) - - client := clientsetfake.NewSimpleClientset(&node) - informerFactory := informers.NewSharedInformerFactory(client, 0) - fns := []st.RegisterPluginFunc{ - st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - st.RegisterPluginAsExtensions(nodeports.Name, 1, nodeports.New, "Filter", "PreFilter"), - } - - scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry( - queuedPodStore, scache, informerFactory, stop, test.BindingDuration, fns...) - - informerFactory.Start(stop) - informerFactory.WaitForCacheSync(stop) - - go scheduler.Run(context.Background()) - queuedPodStore.Add(firstPod) - queuedPodStore.Add(conflictPod) - - resultBindings := map[string]bool{} - waitChan := time.After(5 * time.Second) - for finished := false; !finished; { - select { - case b := <-bindingChan: - resultBindings[b.Name] = true - p := pods[b.Name] - p.Spec.NodeName = b.Target.Name - scache.AddPod(p) - case <-waitChan: - finished = true - } - } - if !reflect.DeepEqual(resultBindings, test.Expected) { - t.Errorf("Result binding are not equal to expected. %v != %v", resultBindings, test.Expected) - } - }) - } -} - // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, @@ -698,15 +625,19 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C bindingChan := make(chan *v1.Binding, 1) errChan := make(chan error, 1) + client := clientsetfake.NewSimpleClientset() + client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + var b *v1.Binding + if action.GetSubresource() == "binding" { + b := action.(clienttesting.CreateAction).GetObject().(*v1.Binding) + bindingChan <- b + } + return true, b, nil + }) + sched := &Scheduler{ SchedulerCache: scache, Algorithm: algo, - GetBinder: func(pod *v1.Pod) Binder { - return fakeBinder{func(b *v1.Binding) error { - bindingChan <- b - return nil - }} - }, NextPod: func() *framework.PodInfo { return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)} }, @@ -718,6 +649,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C podPreemptor: fakePodPreemptor{}, Framework: fwk, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), + client: client, } if recorder != nil { @@ -727,66 +659,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C return sched, bindingChan, errChan } -func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, stop chan struct{}, bindingTime time.Duration, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding) { - registry := framework.Registry{} - // TODO: instantiate the plugins dynamically. - plugins := &schedulerapi.Plugins{ - QueueSort: &schedulerapi.PluginSet{}, - PreFilter: &schedulerapi.PluginSet{}, - Filter: &schedulerapi.PluginSet{}, - } - var pluginConfigs []schedulerapi.PluginConfig - for _, f := range fns { - f(®istry, plugins, pluginConfigs) - } - fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) - queue := internalqueue.NewSchedulingQueue(nil) - algo := core.NewGenericScheduler( - scache, - queue, - internalcache.NewEmptySnapshot(), - fwk, - []algorithm.SchedulerExtender{}, - nil, - informerFactory.Core().V1().PersistentVolumeClaims().Lister(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), - false, - schedulerapi.DefaultPercentageOfNodesToScore, - false, - ) - bindingChan := make(chan *v1.Binding, 2) - - sched := &Scheduler{ - SchedulerCache: scache, - Algorithm: algo, - GetBinder: func(pod *v1.Pod) Binder { - return fakeBinder{func(b *v1.Binding) error { - time.Sleep(bindingTime) - bindingChan <- b - return nil - }} - }, - scheduledPodsHasSynced: func() bool { - return true - }, - NextPod: func() *framework.PodInfo { - return &framework.PodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)} - }, - Error: func(p *framework.PodInfo, err error) { - queuedPodStore.AddIfNotPresent(p) - }, - Recorder: &events.FakeRecorder{}, - podConditionUpdater: fakePodConditionUpdater{}, - podPreemptor: fakePodPreemptor{}, - StopEverything: stop, - Framework: fwk, - VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), - SchedulingQueue: queue, - } - - return sched, bindingChan -} - func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) @@ -1065,3 +937,102 @@ priorities: } } } + +func TestSchedulerBinding(t *testing.T) { + table := []struct { + podName string + extenders []algorithm.SchedulerExtender + wantBinderID int + name string + }{ + { + name: "the extender is not a binder", + podName: "pod0", + extenders: []algorithm.SchedulerExtender{ + &fakeExtender{isBinder: false, interestedPodName: "pod0"}, + }, + wantBinderID: -1, // default binding. + }, + { + name: "one of the extenders is a binder and interested in pod", + podName: "pod0", + extenders: []algorithm.SchedulerExtender{ + &fakeExtender{isBinder: false, interestedPodName: "pod0"}, + &fakeExtender{isBinder: true, interestedPodName: "pod0"}, + }, + wantBinderID: 1, + }, + { + name: "one of the extenders is a binder, but not interested in pod", + podName: "pod1", + extenders: []algorithm.SchedulerExtender{ + &fakeExtender{isBinder: false, interestedPodName: "pod1"}, + &fakeExtender{isBinder: true, interestedPodName: "pod0"}, + }, + wantBinderID: -1, // default binding. + }, + } + + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: test.podName, + }, + } + defaultBound := false + client := clientsetfake.NewSimpleClientset(pod) + client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "binding" { + defaultBound = true + } + return false, nil, nil + }) + fwk, err := framework.NewFramework(framework.Registry{}, nil, nil) + if err != nil { + t.Fatal(err) + } + stop := make(chan struct{}) + defer close(stop) + scache := internalcache.New(100*time.Millisecond, stop) + algo := core.NewGenericScheduler( + scache, + nil, + nil, + fwk, + test.extenders, + nil, + nil, + nil, + false, + 0, + false, + ) + sched := Scheduler{ + Algorithm: algo, + Framework: fwk, + Recorder: &events.FakeRecorder{}, + SchedulerCache: scache, + client: client, + } + err = sched.bind(context.Background(), pod, "node", nil) + if err != nil { + t.Error(err) + } + + // Checking default binding. + if wantBound := test.wantBinderID == -1; defaultBound != wantBound { + t.Errorf("got bound with default binding: %v, want %v", defaultBound, wantBound) + } + + // Checking extenders binding. + for i, ext := range test.extenders { + wantBound := i == test.wantBinderID + if gotBound := ext.(*fakeExtender).gotBind; gotBound != wantBound { + t.Errorf("got bound with extender #%d: %v, want %v", i, gotBound, wantBound) + } + } + + }) + } +}