From 6bbc607d7251088f454aee941e4e3bee45f7e8f0 Mon Sep 17 00:00:00 2001 From: Ahmad Diaa Date: Wed, 2 Oct 2019 07:20:35 +0200 Subject: [PATCH] move PodPreemptor to scheduler --- pkg/scheduler/factory/factory.go | 39 ------------------------- pkg/scheduler/scheduler.go | 50 ++++++++++++++++++++++++++++---- pkg/scheduler/scheduler_test.go | 12 ++++---- 3 files changed, 50 insertions(+), 51 deletions(-) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 51b495da83b..73e6a963c32 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -79,9 +79,6 @@ type Config struct { Algorithm core.ScheduleAlgorithm GetBinder func(pod *v1.Pod) Binder - // PodPreemptor is used to evict pods and update 'NominatedNode' field of - // the preemptor pod. - PodPreemptor PodPreemptor // Framework runs scheduler plugins at configured extension points. Framework framework.Framework @@ -119,15 +116,6 @@ type Config struct { PluginConfig []config.PluginConfig } -// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod' -// field of the preemptor pod. -type PodPreemptor interface { - GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) - DeletePod(pod *v1.Pod) error - SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error - RemoveNominatedNodeName(pod *v1.Pod) error -} - // Configurator defines I/O, caching, and other functionality needed to // construct a new scheduler. type Configurator struct { @@ -471,7 +459,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e SchedulerCache: c.schedulerCache, Algorithm: algo, GetBinder: getBinderFunc(c.client, extenders), - PodPreemptor: &podPreemptor{c.client}, Framework: framework, WaitForCacheSync: func() bool { return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) @@ -719,29 +706,3 @@ func (b *binder) Bind(binding *v1.Binding) error { klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) } - -type podPreemptor struct { - Client clientset.Interface -} - -func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { - return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) -} - -func (p *podPreemptor) DeletePod(pod *v1.Pod) error { - return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) -} - -func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { - podCopy := pod.DeepCopy() - podCopy.Status.NominatedNodeName = nominatedNodeName - _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy) - return err -} - -func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { - if len(pod.Status.NominatedNodeName) == 0 { - return nil - } - return p.SetNominatedNodeName(pod, "") -} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index df8fa1dcf89..78d27983f98 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -55,10 +55,21 @@ const ( // podConditionUpdater updates the condition of a pod based on the passed // PodCondition +// TODO (ahmad-diaa): Remove type and replace it with scheduler methods type podConditionUpdater interface { update(pod *v1.Pod, podCondition *v1.PodCondition) error } +// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod' +// field of the preemptor pod. +// TODO (ahmad-diaa): Remove type and replace it with scheduler methods +type podPreemptor interface { + getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) + deletePod(pod *v1.Pod) error + setNominatedNodeName(pod *v1.Pod, nominatedNode string) error + removeNominatedNodeName(pod *v1.Pod) error +} + // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -74,7 +85,7 @@ type Scheduler struct { podConditionUpdater podConditionUpdater // PodPreemptor is used to evict pods and update 'NominatedNode' field of // the preemptor pod. - PodPreemptor factory.PodPreemptor + podPreemptor podPreemptor // Framework runs scheduler plugins at configured extension points. Framework framework.Framework @@ -344,6 +355,8 @@ func New(client clientset.Interface, // Create the scheduler. sched := NewFromConfig(config) sched.podConditionUpdater = &podConditionUpdaterImpl{client} + sched.podPreemptor = &podPreemptorImpl{client} + AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer) return sched, nil } @@ -391,7 +404,6 @@ func NewFromConfig(config *factory.Config) *Scheduler { SchedulerCache: config.SchedulerCache, Algorithm: config.Algorithm, GetBinder: config.GetBinder, - PodPreemptor: config.PodPreemptor, Framework: config.Framework, NextPod: config.NextPod, WaitForCacheSync: config.WaitForCacheSync, @@ -434,7 +446,7 @@ func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. // It returns the node name and an error if any. func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) { - preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor) + preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err @@ -454,7 +466,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName) // Make a call to update nominated node name of the pod on the API server. - err = sched.PodPreemptor.SetNominatedNodeName(preemptor, nodeName) + err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName) if err != nil { klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor) @@ -462,7 +474,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame } for _, victim := range victims { - if err := sched.PodPreemptor.DeletePod(victim); err != nil { + if err := sched.podPreemptor.deletePod(victim); err != nil { klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return "", err } @@ -481,7 +493,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame // function of generic_scheduler.go returns the pod itself for removal of // the 'NominatedPod' field. for _, p := range nominatedPodsToClear { - rErr := sched.PodPreemptor.RemoveNominatedNodeName(p) + rErr := sched.podPreemptor.removeNominatedNodeName(p) if rErr != nil { klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr) // We do not return as this error is not critical. @@ -756,6 +768,32 @@ func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition return nil } +type podPreemptorImpl struct { + Client clientset.Interface +} + +func (p *podPreemptorImpl) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { + return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) +} + +func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error { + return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) +} + +func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { + podCopy := pod.DeepCopy() + podCopy.Status.NominatedNodeName = nominatedNodeName + _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy) + return err +} + +func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error { + if len(pod.Status.NominatedNodeName) == 0 { + return nil + } + return p.setNominatedNodeName(pod, "") +} + // nodeResourceString returns a string representation of node resources. func nodeResourceString(n *v1.Node) string { if n == nil { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index e5cef8a20d5..18c5bdcb2d1 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -78,19 +78,19 @@ func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondit type fakePodPreemptor struct{} -func (fp fakePodPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { +func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { return pod, nil } -func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error { +func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error { return nil } -func (fp fakePodPreemptor) SetNominatedNodeName(pod *v1.Pod, nomNodeName string) error { +func (fp fakePodPreemptor) setNominatedNodeName(pod *v1.Pod, nomNodeName string) error { return nil } -func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { +func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error { return nil } @@ -674,7 +674,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C }, Recorder: &events.FakeRecorder{}, podConditionUpdater: fakePodConditionUpdater{}, - PodPreemptor: fakePodPreemptor{}, + podPreemptor: fakePodPreemptor{}, Framework: emptyFramework, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), } @@ -728,7 +728,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc }, Recorder: &events.FakeRecorder{}, podConditionUpdater: fakePodConditionUpdater{}, - PodPreemptor: fakePodPreemptor{}, + podPreemptor: fakePodPreemptor{}, StopEverything: stop, Framework: fwk, VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),