diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index d94ea41f96d..28054627957 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -79,8 +79,8 @@ func Run(s *options.SchedulerServer) error { informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) // cache only non-terminal pods - podInformer := factory.NewPodInformer(kubeClient, 0) + podInformer := factory.NewPodInformer(kubeClient, 0, s.SchedulerName) // Apply algorithms based on feature gates. algorithmprovider.ApplyFeatureGates() diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 01c6468ae9a..b83010eda54 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -901,17 +901,9 @@ func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { } func (f *configFactory) getNextPod() *v1.Pod { - for { - pod := cache.Pop(f.podQueue).(*v1.Pod) - if f.ResponsibleForPod(pod) { - glog.V(4).Infof("About to try and schedule pod %v", pod.Name) - return pod - } - } -} - -func (f *configFactory) ResponsibleForPod(pod *v1.Pod) bool { - return f.schedulerName == pod.Spec.SchedulerName + pod := cache.Pop(f.podQueue).(*v1.Pod) + glog.V(4).Infof("About to try and schedule pod %v", pod.Name) + return pod } // unassignedNonTerminatedPod selects pods that are unassigned and non-terminal. @@ -1008,8 +1000,11 @@ func (i *podInformer) Lister() corelisters.PodLister { } // NewPodInformer creates a shared index informer that returns only non-terminal pods. -func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer { - selector := fields.ParseSelectorOrDie("status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) +func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration, schedulerName string) coreinformers.PodInformer { + selector := fields.ParseSelectorOrDie( + "spec.schedulerName=" + schedulerName + + ",status.phase!=" + string(v1.PodSucceeded) + + ",status.phase!=" + string(v1.PodFailed)) lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector) return &podInformer{ informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index fcfe1d51161..2769fa27b9f 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -363,97 +363,6 @@ func TestBind(t *testing.T) { } } -// TestResponsibleForPod tests if a pod with an annotation that should cause it to -// be picked up by the default scheduler, is in fact picked by the default scheduler -// Two schedulers are made in the test: one is default scheduler and other scheduler -// is of name "foo-scheduler". A pod must be picked up by at most one of the two -// schedulers. -func TestResponsibleForPod(t *testing.T) { - handler := utiltesting.FakeHandler{ - StatusCode: 500, - ResponseBody: "", - T: t, - } - server := httptest.NewServer(&handler) - defer server.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - // factory of "default-scheduler" - informerFactory := informers.NewSharedInformerFactory(client, 0) - factoryDefaultScheduler := NewConfigFactory( - v1.DefaultSchedulerName, - client, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - v1.DefaultHardPodAffinitySymmetricWeight, - enableEquivalenceCache, - ) - // factory of "foo-scheduler" - factoryFooScheduler := NewConfigFactory( - "foo-scheduler", - client, - informerFactory.Core().V1().Nodes(), - informerFactory.Core().V1().Pods(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().ReplicationControllers(), - informerFactory.Extensions().V1beta1().ReplicaSets(), - informerFactory.Apps().V1beta1().StatefulSets(), - informerFactory.Core().V1().Services(), - informerFactory.Policy().V1beta1().PodDisruptionBudgets(), - v1.DefaultHardPodAffinitySymmetricWeight, - enableEquivalenceCache, - ) - // scheduler annotations to be tested - schedulerFitsDefault := "default-scheduler" - schedulerFitsFoo := "foo-scheduler" - schedulerFitsNone := "bar-scheduler" - - tests := []struct { - pod *v1.Pod - pickedByDefault bool - pickedByFoo bool - }{ - { - // pod with "spec.Schedulername=default-scheduler" should be picked - // by the scheduler of name "default-scheduler", NOT by the one of name "foo-scheduler" - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, Spec: v1.PodSpec{SchedulerName: schedulerFitsDefault}}, - pickedByDefault: true, - pickedByFoo: false, - }, - { - // pod with "spec.SchedulerName=foo-scheduler" should be NOT - // be picked by the scheduler of name "default-scheduler", but by the one of name "foo-scheduler" - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, Spec: v1.PodSpec{SchedulerName: schedulerFitsFoo}}, - pickedByDefault: false, - pickedByFoo: true, - }, - { - // pod with "spec.SchedulerName=foo-scheduler" should be NOT - // be picked by niether the scheduler of name "default-scheduler" nor the one of name "foo-scheduler" - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, Spec: v1.PodSpec{SchedulerName: schedulerFitsNone}}, - pickedByDefault: false, - pickedByFoo: false, - }, - } - - for _, test := range tests { - podOfDefault := factoryDefaultScheduler.ResponsibleForPod(test.pod) - podOfFoo := factoryFooScheduler.ResponsibleForPod(test.pod) - results := []bool{podOfDefault, podOfFoo} - expected := []bool{test.pickedByDefault, test.pickedByFoo} - if !reflect.DeepEqual(results, expected) { - t.Errorf("expected: {%v, %v}, got {%v, %v}", test.pickedByDefault, test.pickedByFoo, podOfDefault, podOfFoo) - } - } -} - func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) { handler := utiltesting.FakeHandler{ StatusCode: 500, diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 729ac23a6cc..33f60241c12 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -82,9 +82,6 @@ type Configurator interface { GetSchedulerName() string MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) - // Probably doesn't need to be public. But exposed for now in case. - ResponsibleForPod(pod *v1.Pod) bool - // Needs to be exposed for things like integration tests where we want to make fake nodes. GetNodeLister() corelisters.NodeLister GetClient() clientset.Interface diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index ae60fa5fc47..08c0196f8cf 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -464,6 +464,7 @@ func TestMultiScheduler(t *testing.T) { context.clientSet.CoreV1().Nodes().Create(node) // 3. create 3 pods for testing + t.Logf("create 3 pods for testing") testPod, err := createPausePodWithResource(context.clientSet, "pod-without-scheduler-name", context.ns.Name, nil) if err != nil { t.Fatalf("Failed to create pod: %v", err) @@ -484,6 +485,7 @@ func TestMultiScheduler(t *testing.T) { // 4. **check point-1**: // - testPod, testPodFitsDefault should be scheduled // - testPodFitsFoo should NOT be scheduled + t.Logf("wait for pods scheduled") if err := waitForPodToSchedule(context.clientSet, testPod); err != nil { t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPod.Name, err) } else { @@ -505,12 +507,13 @@ func TestMultiScheduler(t *testing.T) { // 5. create and start a scheduler with name "foo-scheduler" clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0) + podInformer2 := factory.NewPodInformer(context.clientSet, 0, fooScheduler) schedulerConfigFactory2 := factory.NewConfigFactory( fooScheduler, clientSet2, informerFactory2.Core().V1().Nodes(), - informerFactory2.Core().V1().Pods(), + podInformer2, informerFactory2.Core().V1().PersistentVolumes(), informerFactory2.Core().V1().PersistentVolumeClaims(), informerFactory2.Core().V1().ReplicationControllers(), @@ -528,6 +531,7 @@ func TestMultiScheduler(t *testing.T) { eventBroadcaster2 := record.NewBroadcaster() schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler}) eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet2.CoreV1().RESTClient()).Events("")}) + go podInformer2.Informer().Run(schedulerConfig2.StopEverything) informerFactory2.Start(schedulerConfig2.StopEverything) sched2, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig2}, nil...) diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 6cd8af88b1e..10517e3a859 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -64,12 +64,12 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { context.clientSet = clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}}) context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, 0) - + podInformer := factory.NewPodInformer(context.clientSet, 30*time.Second, v1.DefaultSchedulerName) context.schedulerConfigFactory = factory.NewConfigFactory( v1.DefaultSchedulerName, context.clientSet, context.informerFactory.Core().V1().Nodes(), - context.informerFactory.Core().V1().Pods(), + podInformer, context.informerFactory.Core().V1().PersistentVolumes(), context.informerFactory.Core().V1().PersistentVolumeClaims(), context.informerFactory.Core().V1().ReplicationControllers(), @@ -88,6 +88,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { eventBroadcaster := record.NewBroadcaster() context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}) eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(context.clientSet.CoreV1().RESTClient()).Events("")}) + go podInformer.Informer().Run(context.schedulerConfig.StopEverything) context.informerFactory.Start(context.schedulerConfig.StopEverything) context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: context.schedulerConfig}, nil...) if err != nil {