mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
update scheduler to use schedulerName selector
This commit is contained in:
parent
17f6833471
commit
4b185b4db9
@ -79,8 +79,8 @@ func Run(s *options.SchedulerServer) error {
|
|||||||
|
|
||||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
|
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
|
||||||
// cache only non-terminal pods
|
// cache only non-terminal pods
|
||||||
podInformer := factory.NewPodInformer(kubeClient, 0)
|
|
||||||
|
|
||||||
|
podInformer := factory.NewPodInformer(kubeClient, 0, s.SchedulerName)
|
||||||
// Apply algorithms based on feature gates.
|
// Apply algorithms based on feature gates.
|
||||||
algorithmprovider.ApplyFeatureGates()
|
algorithmprovider.ApplyFeatureGates()
|
||||||
|
|
||||||
|
@ -901,17 +901,9 @@ func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *configFactory) getNextPod() *v1.Pod {
|
func (f *configFactory) getNextPod() *v1.Pod {
|
||||||
for {
|
pod := cache.Pop(f.podQueue).(*v1.Pod)
|
||||||
pod := cache.Pop(f.podQueue).(*v1.Pod)
|
glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
|
||||||
if f.ResponsibleForPod(pod) {
|
return pod
|
||||||
glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
|
|
||||||
return pod
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *configFactory) ResponsibleForPod(pod *v1.Pod) bool {
|
|
||||||
return f.schedulerName == pod.Spec.SchedulerName
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// unassignedNonTerminatedPod selects pods that are unassigned and non-terminal.
|
// unassignedNonTerminatedPod selects pods that are unassigned and non-terminal.
|
||||||
@ -1008,8 +1000,11 @@ func (i *podInformer) Lister() corelisters.PodLister {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewPodInformer creates a shared index informer that returns only non-terminal pods.
|
// NewPodInformer creates a shared index informer that returns only non-terminal pods.
|
||||||
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer {
|
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration, schedulerName string) coreinformers.PodInformer {
|
||||||
selector := fields.ParseSelectorOrDie("status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed))
|
selector := fields.ParseSelectorOrDie(
|
||||||
|
"spec.schedulerName=" + schedulerName +
|
||||||
|
",status.phase!=" + string(v1.PodSucceeded) +
|
||||||
|
",status.phase!=" + string(v1.PodFailed))
|
||||||
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
|
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
|
||||||
return &podInformer{
|
return &podInformer{
|
||||||
informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
|
informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
|
||||||
|
@ -363,97 +363,6 @@ func TestBind(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestResponsibleForPod tests if a pod with an annotation that should cause it to
|
|
||||||
// be picked up by the default scheduler, is in fact picked by the default scheduler
|
|
||||||
// Two schedulers are made in the test: one is default scheduler and other scheduler
|
|
||||||
// is of name "foo-scheduler". A pod must be picked up by at most one of the two
|
|
||||||
// schedulers.
|
|
||||||
func TestResponsibleForPod(t *testing.T) {
|
|
||||||
handler := utiltesting.FakeHandler{
|
|
||||||
StatusCode: 500,
|
|
||||||
ResponseBody: "",
|
|
||||||
T: t,
|
|
||||||
}
|
|
||||||
server := httptest.NewServer(&handler)
|
|
||||||
defer server.Close()
|
|
||||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
|
||||||
// factory of "default-scheduler"
|
|
||||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
||||||
factoryDefaultScheduler := NewConfigFactory(
|
|
||||||
v1.DefaultSchedulerName,
|
|
||||||
client,
|
|
||||||
informerFactory.Core().V1().Nodes(),
|
|
||||||
informerFactory.Core().V1().Pods(),
|
|
||||||
informerFactory.Core().V1().PersistentVolumes(),
|
|
||||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
|
||||||
informerFactory.Core().V1().ReplicationControllers(),
|
|
||||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
|
||||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
|
||||||
informerFactory.Core().V1().Services(),
|
|
||||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
|
||||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
|
||||||
enableEquivalenceCache,
|
|
||||||
)
|
|
||||||
// factory of "foo-scheduler"
|
|
||||||
factoryFooScheduler := NewConfigFactory(
|
|
||||||
"foo-scheduler",
|
|
||||||
client,
|
|
||||||
informerFactory.Core().V1().Nodes(),
|
|
||||||
informerFactory.Core().V1().Pods(),
|
|
||||||
informerFactory.Core().V1().PersistentVolumes(),
|
|
||||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
|
||||||
informerFactory.Core().V1().ReplicationControllers(),
|
|
||||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
|
||||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
|
||||||
informerFactory.Core().V1().Services(),
|
|
||||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
|
||||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
|
||||||
enableEquivalenceCache,
|
|
||||||
)
|
|
||||||
// scheduler annotations to be tested
|
|
||||||
schedulerFitsDefault := "default-scheduler"
|
|
||||||
schedulerFitsFoo := "foo-scheduler"
|
|
||||||
schedulerFitsNone := "bar-scheduler"
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
pod *v1.Pod
|
|
||||||
pickedByDefault bool
|
|
||||||
pickedByFoo bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
// pod with "spec.Schedulername=default-scheduler" should be picked
|
|
||||||
// by the scheduler of name "default-scheduler", NOT by the one of name "foo-scheduler"
|
|
||||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, Spec: v1.PodSpec{SchedulerName: schedulerFitsDefault}},
|
|
||||||
pickedByDefault: true,
|
|
||||||
pickedByFoo: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
// pod with "spec.SchedulerName=foo-scheduler" should be NOT
|
|
||||||
// be picked by the scheduler of name "default-scheduler", but by the one of name "foo-scheduler"
|
|
||||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, Spec: v1.PodSpec{SchedulerName: schedulerFitsFoo}},
|
|
||||||
pickedByDefault: false,
|
|
||||||
pickedByFoo: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
// pod with "spec.SchedulerName=foo-scheduler" should be NOT
|
|
||||||
// be picked by niether the scheduler of name "default-scheduler" nor the one of name "foo-scheduler"
|
|
||||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"}, Spec: v1.PodSpec{SchedulerName: schedulerFitsNone}},
|
|
||||||
pickedByDefault: false,
|
|
||||||
pickedByFoo: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
podOfDefault := factoryDefaultScheduler.ResponsibleForPod(test.pod)
|
|
||||||
podOfFoo := factoryFooScheduler.ResponsibleForPod(test.pod)
|
|
||||||
results := []bool{podOfDefault, podOfFoo}
|
|
||||||
expected := []bool{test.pickedByDefault, test.pickedByFoo}
|
|
||||||
if !reflect.DeepEqual(results, expected) {
|
|
||||||
t.Errorf("expected: {%v, %v}, got {%v, %v}", test.pickedByDefault, test.pickedByFoo, podOfDefault, podOfFoo)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
|
func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
|
||||||
handler := utiltesting.FakeHandler{
|
handler := utiltesting.FakeHandler{
|
||||||
StatusCode: 500,
|
StatusCode: 500,
|
||||||
|
@ -82,9 +82,6 @@ type Configurator interface {
|
|||||||
GetSchedulerName() string
|
GetSchedulerName() string
|
||||||
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error)
|
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error)
|
||||||
|
|
||||||
// Probably doesn't need to be public. But exposed for now in case.
|
|
||||||
ResponsibleForPod(pod *v1.Pod) bool
|
|
||||||
|
|
||||||
// Needs to be exposed for things like integration tests where we want to make fake nodes.
|
// Needs to be exposed for things like integration tests where we want to make fake nodes.
|
||||||
GetNodeLister() corelisters.NodeLister
|
GetNodeLister() corelisters.NodeLister
|
||||||
GetClient() clientset.Interface
|
GetClient() clientset.Interface
|
||||||
|
@ -464,6 +464,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||||||
context.clientSet.CoreV1().Nodes().Create(node)
|
context.clientSet.CoreV1().Nodes().Create(node)
|
||||||
|
|
||||||
// 3. create 3 pods for testing
|
// 3. create 3 pods for testing
|
||||||
|
t.Logf("create 3 pods for testing")
|
||||||
testPod, err := createPausePodWithResource(context.clientSet, "pod-without-scheduler-name", context.ns.Name, nil)
|
testPod, err := createPausePodWithResource(context.clientSet, "pod-without-scheduler-name", context.ns.Name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create pod: %v", err)
|
t.Fatalf("Failed to create pod: %v", err)
|
||||||
@ -484,6 +485,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||||||
// 4. **check point-1**:
|
// 4. **check point-1**:
|
||||||
// - testPod, testPodFitsDefault should be scheduled
|
// - testPod, testPodFitsDefault should be scheduled
|
||||||
// - testPodFitsFoo should NOT be scheduled
|
// - testPodFitsFoo should NOT be scheduled
|
||||||
|
t.Logf("wait for pods scheduled")
|
||||||
if err := waitForPodToSchedule(context.clientSet, testPod); err != nil {
|
if err := waitForPodToSchedule(context.clientSet, testPod); err != nil {
|
||||||
t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPod.Name, err)
|
t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPod.Name, err)
|
||||||
} else {
|
} else {
|
||||||
@ -505,12 +507,13 @@ func TestMultiScheduler(t *testing.T) {
|
|||||||
// 5. create and start a scheduler with name "foo-scheduler"
|
// 5. create and start a scheduler with name "foo-scheduler"
|
||||||
clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
||||||
informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0)
|
informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0)
|
||||||
|
podInformer2 := factory.NewPodInformer(context.clientSet, 0, fooScheduler)
|
||||||
|
|
||||||
schedulerConfigFactory2 := factory.NewConfigFactory(
|
schedulerConfigFactory2 := factory.NewConfigFactory(
|
||||||
fooScheduler,
|
fooScheduler,
|
||||||
clientSet2,
|
clientSet2,
|
||||||
informerFactory2.Core().V1().Nodes(),
|
informerFactory2.Core().V1().Nodes(),
|
||||||
informerFactory2.Core().V1().Pods(),
|
podInformer2,
|
||||||
informerFactory2.Core().V1().PersistentVolumes(),
|
informerFactory2.Core().V1().PersistentVolumes(),
|
||||||
informerFactory2.Core().V1().PersistentVolumeClaims(),
|
informerFactory2.Core().V1().PersistentVolumeClaims(),
|
||||||
informerFactory2.Core().V1().ReplicationControllers(),
|
informerFactory2.Core().V1().ReplicationControllers(),
|
||||||
@ -528,6 +531,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||||||
eventBroadcaster2 := record.NewBroadcaster()
|
eventBroadcaster2 := record.NewBroadcaster()
|
||||||
schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler})
|
schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler})
|
||||||
eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet2.CoreV1().RESTClient()).Events("")})
|
eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet2.CoreV1().RESTClient()).Events("")})
|
||||||
|
go podInformer2.Informer().Run(schedulerConfig2.StopEverything)
|
||||||
informerFactory2.Start(schedulerConfig2.StopEverything)
|
informerFactory2.Start(schedulerConfig2.StopEverything)
|
||||||
|
|
||||||
sched2, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig2}, nil...)
|
sched2, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig2}, nil...)
|
||||||
|
@ -64,12 +64,12 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
|
|||||||
|
|
||||||
context.clientSet = clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
context.clientSet = clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
||||||
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, 0)
|
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, 0)
|
||||||
|
podInformer := factory.NewPodInformer(context.clientSet, 30*time.Second, v1.DefaultSchedulerName)
|
||||||
context.schedulerConfigFactory = factory.NewConfigFactory(
|
context.schedulerConfigFactory = factory.NewConfigFactory(
|
||||||
v1.DefaultSchedulerName,
|
v1.DefaultSchedulerName,
|
||||||
context.clientSet,
|
context.clientSet,
|
||||||
context.informerFactory.Core().V1().Nodes(),
|
context.informerFactory.Core().V1().Nodes(),
|
||||||
context.informerFactory.Core().V1().Pods(),
|
podInformer,
|
||||||
context.informerFactory.Core().V1().PersistentVolumes(),
|
context.informerFactory.Core().V1().PersistentVolumes(),
|
||||||
context.informerFactory.Core().V1().PersistentVolumeClaims(),
|
context.informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||||
context.informerFactory.Core().V1().ReplicationControllers(),
|
context.informerFactory.Core().V1().ReplicationControllers(),
|
||||||
@ -88,6 +88,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
|
|||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName})
|
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName})
|
||||||
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(context.clientSet.CoreV1().RESTClient()).Events("")})
|
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(context.clientSet.CoreV1().RESTClient()).Events("")})
|
||||||
|
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
|
||||||
context.informerFactory.Start(context.schedulerConfig.StopEverything)
|
context.informerFactory.Start(context.schedulerConfig.StopEverything)
|
||||||
context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: context.schedulerConfig}, nil...)
|
context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: context.schedulerConfig}, nil...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user