diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index f2eb66349e1..c8a8e3f6610 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -9,6 +9,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler", visibility = ["//visibility:public"], deps = [ + "//pkg/api/v1/pod:go_default_library", "//pkg/features:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api/latest:go_default_library", diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 68741039a07..a8b8e84a9a6 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -9,7 +9,6 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler/factory", visibility = ["//visibility:public"], deps = [ - "//pkg/api/v1/pod:go_default_library", "//pkg/features:go_default_library", "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index a7e7d61db55..b644188a058 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -45,7 +45,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" "k8s.io/klog" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -72,12 +71,6 @@ type Binder interface { Bind(binding *v1.Binding) error } -// PodConditionUpdater updates the condition of a pod based on the passed -// PodCondition -type PodConditionUpdater interface { - Update(pod *v1.Pod, podCondition *v1.PodCondition) error -} - // Config is an implementation of the Scheduler's configured input data. // TODO over time we should make this struct a hidden implementation detail of the scheduler. type Config struct { @@ -85,10 +78,6 @@ type Config struct { Algorithm core.ScheduleAlgorithm GetBinder func(pod *v1.Pod) Binder - // PodConditionUpdater is used only in case of scheduling errors. If we succeed - // with scheduling, PodScheduled condition will be updated in apiserver in /bind - // handler so that binding and setting PodCondition it is atomic. - PodConditionUpdater PodConditionUpdater // PodPreemptor is used to evict pods and update 'NominatedNode' field of // the preemptor pod. PodPreemptor PodPreemptor @@ -454,12 +443,11 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e ) return &Config{ - SchedulerCache: c.schedulerCache, - Algorithm: algo, - GetBinder: getBinderFunc(c.client, extenders), - PodConditionUpdater: &podConditionUpdater{c.client}, - PodPreemptor: &podPreemptor{c.client}, - Framework: framework, + SchedulerCache: c.schedulerCache, + Algorithm: algo, + GetBinder: getBinderFunc(c.client, extenders), + PodPreemptor: &podPreemptor{c.client}, + Framework: framework, WaitForCacheSync: func() bool { return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) }, @@ -705,19 +693,6 @@ func (b *binder) Bind(binding *v1.Binding) error { return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) } -type podConditionUpdater struct { - Client clientset.Interface -} - -func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) error { - klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason) - if podutil.UpdatePodCondition(&pod.Status, condition) { - _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) - return err - } - return nil -} - type podPreemptor struct { Client clientset.Interface } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index bc3d7f620dc..3654609ae1e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -35,6 +35,7 @@ import ( storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/events" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -54,6 +55,12 @@ const ( SchedulerError = "SchedulerError" ) +// podConditionUpdater updates the condition of a pod based on the passed +// PodCondition +type podConditionUpdater interface { + update(pod *v1.Pod, podCondition *v1.PodCondition) error +} + // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -66,7 +73,7 @@ type Scheduler struct { // PodConditionUpdater is used only in case of scheduling errors. If we succeed // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. - PodConditionUpdater factory.PodConditionUpdater + podConditionUpdater podConditionUpdater // PodPreemptor is used to evict pods and update 'NominatedNode' field of // the preemptor pod. PodPreemptor factory.PodPreemptor @@ -247,7 +254,7 @@ func New(client clientset.Interface, // Create the scheduler. sched := NewFromConfig(config) - + sched.podConditionUpdater = &podConditionUpdaterImpl{client} AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer, csiNodeInformer) return sched, nil } @@ -292,20 +299,19 @@ func initPolicyFromConfigMap(client clientset.Interface, policyRef *kubeschedule func NewFromConfig(config *factory.Config) *Scheduler { metrics.Register() return &Scheduler{ - SchedulerCache: config.SchedulerCache, - Algorithm: config.Algorithm, - GetBinder: config.GetBinder, - PodConditionUpdater: config.PodConditionUpdater, - PodPreemptor: config.PodPreemptor, - Framework: config.Framework, - NextPod: config.NextPod, - WaitForCacheSync: config.WaitForCacheSync, - Error: config.Error, - Recorder: config.Recorder, - StopEverything: config.StopEverything, - VolumeBinder: config.VolumeBinder, - DisablePreemption: config.DisablePreemption, - SchedulingQueue: config.SchedulingQueue, + SchedulerCache: config.SchedulerCache, + Algorithm: config.Algorithm, + GetBinder: config.GetBinder, + PodPreemptor: config.PodPreemptor, + Framework: config.Framework, + NextPod: config.NextPod, + WaitForCacheSync: config.WaitForCacheSync, + Error: config.Error, + Recorder: config.Recorder, + StopEverything: config.StopEverything, + VolumeBinder: config.VolumeBinder, + DisablePreemption: config.DisablePreemption, + SchedulingQueue: config.SchedulingQueue, } } @@ -324,7 +330,7 @@ func (sched *Scheduler) Run() { func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) { sched.Error(pod, err) sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) - if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{ + if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: reason, @@ -670,6 +676,19 @@ func (sched *Scheduler) scheduleOne() { }() } +type podConditionUpdaterImpl struct { + Client clientset.Interface +} + +func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error { + klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason) + if podutil.UpdatePodCondition(&pod.Status, condition) { + _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod) + return err + } + return nil +} + // nodeResourceString returns a string representation of node resources. func nodeResourceString(n *v1.Node) string { if n == nil { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9c39cea061a..0a0ffd7023f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -74,7 +74,7 @@ func (fb fakeBinder) Bind(binding *v1.Binding) error { return fb.b(binding) } type fakePodConditionUpdater struct{} -func (fc fakePodConditionUpdater) Update(pod *v1.Pod, podCondition *v1.PodCondition) error { +func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondition) error { return nil } @@ -286,7 +286,7 @@ func TestScheduler(t *testing.T) { }, } - s := NewFromConfig(&factory.Config{ + s := &Scheduler{ SchedulerCache: sCache, Algorithm: item.algo, GetBinder: func(pod *v1.Pod) factory.Binder { @@ -295,7 +295,7 @@ func TestScheduler(t *testing.T) { return item.injectBindError }} }, - PodConditionUpdater: fakePodConditionUpdater{}, + podConditionUpdater: fakePodConditionUpdater{}, Error: func(p *v1.Pod, err error) { gotPod = p gotError = err @@ -306,7 +306,7 @@ func TestScheduler(t *testing.T) { Framework: emptyFramework, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), - }) + } called := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { e, _ := obj.(*v1beta1.Event) @@ -670,7 +670,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C bindingChan := make(chan *v1.Binding, 1) errChan := make(chan error, 1) - config := &factory.Config{ + sched := &Scheduler{ SchedulerCache: scache, Algorithm: algo, GetBinder: func(pod *v1.Pod) factory.Binder { @@ -686,18 +686,16 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C errChan <- err }, Recorder: &events.FakeRecorder{}, - PodConditionUpdater: fakePodConditionUpdater{}, + podConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, Framework: emptyFramework, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), } if recorder != nil { - config.Recorder = recorder + sched.Recorder = recorder } - sched := NewFromConfig(config) - return sched, bindingChan, errChan } @@ -722,7 +720,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc ) bindingChan := make(chan *v1.Binding, 2) - sched := NewFromConfig(&factory.Config{ + sched := &Scheduler{ SchedulerCache: scache, Algorithm: algo, GetBinder: func(pod *v1.Pod) factory.Binder { @@ -742,12 +740,12 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc queuedPodStore.AddIfNotPresent(p) }, Recorder: &events.FakeRecorder{}, - PodConditionUpdater: fakePodConditionUpdater{}, + podConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, StopEverything: stop, Framework: framework, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), - }) + } return sched, bindingChan }