diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index b8e3c550a5a..ce46d72cbb8 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -17,7 +17,9 @@ go_library( "//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", + "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", + "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 5ad4be1eed5..53462e221ba 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -41,7 +41,7 @@ func (sched *Scheduler) onPvAdd(obj interface{}) { // provisioning and binding process, will not trigger events to schedule pod // again. So we need to move pods to active queue on PV add for this // scenario. - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) onPvUpdate(old, new interface{}) { @@ -49,15 +49,15 @@ func (sched *Scheduler) onPvUpdate(old, new interface{}) { // bindings due to conflicts if PVs are updated by PV controller or other // parties, then scheduler will add pod back to unschedulable queue. We // need to move pods to active queue on PV update for this scenario. - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) onPvcAdd(obj interface{}) { - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) onPvcUpdate(old, new interface{}) { - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) onStorageClassAdd(obj interface{}) { @@ -74,20 +74,20 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) { // We don't need to invalidate cached results because results will not be // cached for pod that has unbound immediate PVCs. if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } } func (sched *Scheduler) onServiceAdd(obj interface{}) { - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) { - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) onServiceDelete(obj interface{}) { - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) addNodeToCache(obj interface{}) { @@ -97,11 +97,11 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) { return } - if err := sched.config.SchedulerCache.AddNode(node); err != nil { + if err := sched.SchedulerCache.AddNode(node); err != nil { klog.Errorf("scheduler cache AddNode failed: %v", err) } - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { @@ -116,7 +116,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { return } - if err := sched.config.SchedulerCache.UpdateNode(oldNode, newNode); err != nil { + if err := sched.SchedulerCache.UpdateNode(oldNode, newNode); err != nil { klog.Errorf("scheduler cache UpdateNode failed: %v", err) } @@ -125,8 +125,8 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) { // to save processing cycles. We still trigger a move to active queue to cover the case // that a pod being processed by the scheduler is determined unschedulable. We want this // pod to be reevaluated when a change in the cluster happens. - if sched.config.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) { - sched.config.SchedulingQueue.MoveAllToActiveQueue() + if sched.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) { + sched.SchedulingQueue.MoveAllToActiveQueue() } } @@ -151,7 +151,7 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { // invalidation and then snapshot the cache itself. If the cache is // snapshotted before updates are written, we would update equivalence // cache with stale information which is based on snapshot of old cache. - if err := sched.config.SchedulerCache.RemoveNode(node); err != nil { + if err := sched.SchedulerCache.RemoveNode(node); err != nil { klog.Errorf("scheduler cache RemoveNode failed: %v", err) } } @@ -163,11 +163,11 @@ func (sched *Scheduler) onCSINodeAdd(obj interface{}) { return } - if err := sched.config.SchedulerCache.AddCSINode(csiNode); err != nil { + if err := sched.SchedulerCache.AddCSINode(csiNode); err != nil { klog.Errorf("scheduler cache AddCSINode failed: %v", err) } - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) { @@ -183,11 +183,11 @@ func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) { return } - if err := sched.config.SchedulerCache.UpdateCSINode(oldCSINode, newCSINode); err != nil { + if err := sched.SchedulerCache.UpdateCSINode(oldCSINode, newCSINode); err != nil { klog.Errorf("scheduler cache UpdateCSINode failed: %v", err) } - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } func (sched *Scheduler) onCSINodeDelete(obj interface{}) { @@ -207,13 +207,13 @@ func (sched *Scheduler) onCSINodeDelete(obj interface{}) { return } - if err := sched.config.SchedulerCache.RemoveCSINode(csiNode); err != nil { + if err := sched.SchedulerCache.RemoveCSINode(csiNode); err != nil { klog.Errorf("scheduler cache RemoveCSINode failed: %v", err) } } func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { - if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil { + if err := sched.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil { utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) } } @@ -223,7 +223,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { if sched.skipPodUpdate(pod) { return } - if err := sched.config.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil { + if err := sched.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil { utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) } } @@ -244,12 +244,12 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) return } - if err := sched.config.SchedulingQueue.Delete(pod); err != nil { + if err := sched.SchedulingQueue.Delete(pod); err != nil { utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) } - if sched.config.VolumeBinder != nil { + if sched.VolumeBinder != nil { // Volume binder only wants to keep unassigned pods - sched.config.VolumeBinder.DeletePodBindings(pod) + sched.VolumeBinder.DeletePodBindings(pod) } } @@ -260,11 +260,11 @@ func (sched *Scheduler) addPodToCache(obj interface{}) { return } - if err := sched.config.SchedulerCache.AddPod(pod); err != nil { + if err := sched.SchedulerCache.AddPod(pod); err != nil { klog.Errorf("scheduler cache AddPod failed: %v", err) } - sched.config.SchedulingQueue.AssignedPodAdded(pod) + sched.SchedulingQueue.AssignedPodAdded(pod) } func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { @@ -284,11 +284,11 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { // invalidation and then snapshot the cache itself. If the cache is // snapshotted before updates are written, we would update equivalence // cache with stale information which is based on snapshot of old cache. - if err := sched.config.SchedulerCache.UpdatePod(oldPod, newPod); err != nil { + if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil { klog.Errorf("scheduler cache UpdatePod failed: %v", err) } - sched.config.SchedulingQueue.AssignedPodUpdated(newPod) + sched.SchedulingQueue.AssignedPodUpdated(newPod) } func (sched *Scheduler) deletePodFromCache(obj interface{}) { @@ -312,11 +312,11 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) { // invalidation and then snapshot the cache itself. If the cache is // snapshotted before updates are written, we would update equivalence // cache with stale information which is based on snapshot of old cache. - if err := sched.config.SchedulerCache.RemovePod(pod); err != nil { + if err := sched.SchedulerCache.RemovePod(pod); err != nil { klog.Errorf("scheduler cache RemovePod failed: %v", err) } - sched.config.SchedulingQueue.MoveAllToActiveQueue() + sched.SchedulingQueue.MoveAllToActiveQueue() } // assignedPod selects pods that are assigned (scheduled and running). @@ -336,7 +336,7 @@ func responsibleForPod(pod *v1.Pod, schedulerName string) bool { // updated. func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool { // Non-assumed pods should never be skipped. - isAssumed, err := sched.config.SchedulerCache.IsAssumedPod(pod) + isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod) if err != nil { utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err)) return false @@ -346,7 +346,7 @@ func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool { } // Gets the assumed pod from the cache. - assumedPod, err := sched.config.SchedulerCache.GetPod(pod) + assumedPod, err := sched.SchedulerCache.GetPod(pod) if err != nil { utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err)) return false diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index bf73150b13b..e284a4523cc 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -42,7 +42,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/factory" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" + internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) const ( @@ -55,12 +57,55 @@ const ( // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { - config *factory.Config + // It is expected that changes made via SchedulerCache will be observed + // by NodeLister and Algorithm. + SchedulerCache internalcache.Cache + + Algorithm core.ScheduleAlgorithm + GetBinder func(pod *v1.Pod) factory.Binder + // PodConditionUpdater is used only in case of scheduling errors. If we succeed + // with scheduling, PodScheduled condition will be updated in apiserver in /bind + // handler so that binding and setting PodCondition it is atomic. + PodConditionUpdater factory.PodConditionUpdater + // PodPreemptor is used to evict pods and update 'NominatedNode' field of + // the preemptor pod. + PodPreemptor factory.PodPreemptor + // Framework runs scheduler plugins at configured extension points. + Framework framework.Framework + + // NextPod should be a function that blocks until the next pod + // is available. We don't use a channel for this, because scheduling + // a pod may take some amount of time and we don't want pods to get + // stale while they sit in a channel. + NextPod func() *v1.Pod + + // WaitForCacheSync waits for scheduler cache to populate. + // It returns true if it was successful, false if the controller should shutdown. + WaitForCacheSync func() bool + + // Error is called if there is an error. It is passed the pod in + // question, and the error + Error func(*v1.Pod, error) + + // Recorder is the EventRecorder to use + Recorder events.EventRecorder + + // Close this to shut down the scheduler. + StopEverything <-chan struct{} + + // VolumeBinder handles PVC/PV binding for the pod. + VolumeBinder *volumebinder.VolumeBinder + + // Disable pod preemption or not. + DisablePreemption bool + + // SchedulingQueue holds pods to be scheduled + SchedulingQueue internalqueue.SchedulingQueue } // Cache returns the cache in scheduler for test to check the data in scheduler. func (sched *Scheduler) Cache() internalcache.Cache { - return sched.config.SchedulerCache + return sched.SchedulerCache } type schedulerOptions struct { @@ -247,31 +292,39 @@ func initPolicyFromConfigMap(client clientset.Interface, policyRef *kubeschedule func NewFromConfig(config *factory.Config) *Scheduler { metrics.Register() return &Scheduler{ - config: config, + SchedulerCache: config.SchedulerCache, + Algorithm: config.Algorithm, + GetBinder: config.GetBinder, + PodConditionUpdater: config.PodConditionUpdater, + PodPreemptor: config.PodPreemptor, + Framework: config.Framework, + NextPod: config.NextPod, + WaitForCacheSync: config.WaitForCacheSync, + Error: config.Error, + Recorder: config.Recorder, + StopEverything: config.StopEverything, + VolumeBinder: config.VolumeBinder, + DisablePreemption: config.DisablePreemption, + SchedulingQueue: config.SchedulingQueue, } } // Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately. func (sched *Scheduler) Run() { - if !sched.config.WaitForCacheSync() { + if !sched.WaitForCacheSync() { return } - go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) -} - -// Config returns scheduler's config pointer. It is exposed for testing purposes. -func (sched *Scheduler) Config() *factory.Config { - return sched.config + go wait.Until(sched.scheduleOne, 0, sched.StopEverything) } // recordFailedSchedulingEvent records an event for the pod that indicates the // pod has failed to schedule. // NOTE: This function modifies "pod". "pod" should be copied before being passed. func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) { - sched.config.Error(pod, err) - sched.config.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) - if err := sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{ + sched.Error(pod, err) + sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) + if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: reason, @@ -284,7 +337,7 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s // schedule implements the scheduling algorithm and returns the suggested result(host, // evaluated nodes number,feasible nodes number). func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) { - result, err := sched.config.Algorithm.Schedule(pod, pluginContext) + result, err := sched.Algorithm.Schedule(pod, pluginContext) if err != nil { pod = pod.DeepCopy() sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error()) @@ -297,13 +350,13 @@ func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginCon // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. // It returns the node name and an error if any. func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) { - preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor) + preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err } - node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, scheduleErr) + node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(preemptor, scheduleErr) if err != nil { klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) return "", err @@ -314,18 +367,18 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche // Update the scheduling queue with the nominated pod information. Without // this, there would be a race condition between the next scheduling cycle // and the time the scheduler receives a Pod Update for the nominated pod. - sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName) + sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName) // Make a call to update nominated node name of the pod on the API server. - err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName) + err = sched.PodPreemptor.SetNominatedNodeName(preemptor, nodeName) if err != nil { klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) - sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor) + sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor) return "", err } for _, victim := range victims { - if err := sched.config.PodPreemptor.DeletePod(victim); err != nil { + if err := sched.PodPreemptor.DeletePod(victim); err != nil { klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return "", err } @@ -333,7 +386,7 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject("preempted") } - sched.config.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) + sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) } metrics.PreemptionVictims.Set(float64(len(victims))) @@ -344,7 +397,7 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche // function of generic_scheduler.go returns the pod itself for removal of // the 'NominatedPod' field. for _, p := range nominatedPodsToClear { - rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p) + rErr := sched.PodPreemptor.RemoveNominatedNodeName(p) if rErr != nil { klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr) // We do not return as this error is not critical. @@ -357,7 +410,7 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche // // This function modifies assumed if volume binding is required. func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) { - allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host) + allBound, err = sched.VolumeBinder.Binder.AssumePodVolumes(assumed, host) if err != nil { sched.recordSchedulingFailure(assumed, err, SchedulerError, fmt.Sprintf("AssumePodVolumes failed: %v", err)) @@ -372,12 +425,12 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo // retry scheduling. func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) - err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed) + err := sched.VolumeBinder.Binder.BindPodVolumes(assumed) if err != nil { klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err) // Unassume the Pod and retry scheduling - if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil { + if forgetErr := sched.SchedulerCache.ForgetPod(assumed); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } @@ -398,7 +451,7 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // immediately. assumed.Spec.NodeName = host - if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil { + if err := sched.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf("scheduler cache AssumePod failed: %v", err) // This is most probably result of a BUG in retrying logic. @@ -411,8 +464,8 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { return err } // if "assumed" is a nominated pod, we should remove it from internal cache - if sched.config.SchedulingQueue != nil { - sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed) + if sched.SchedulingQueue != nil { + sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) } return nil @@ -422,14 +475,14 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // handle binding metrics internally. func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *framework.PluginContext) error { bindingStart := time.Now() - bindStatus := sched.config.Framework.RunBindPlugins(pluginContext, assumed, targetNode) + bindStatus := sched.Framework.RunBindPlugins(pluginContext, assumed, targetNode) var err error if !bindStatus.IsSuccess() { if bindStatus.Code() == framework.Skip { // All bind plugins chose to skip binding of this pod, call original binding function. // If binding succeeds then PodScheduled condition will be updated in apiserver so that // it's atomic with setting host. - err = sched.config.GetBinder(assumed).Bind(&v1.Binding{ + err = sched.GetBinder(assumed).Bind(&v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID}, Target: v1.ObjectReference{ Kind: "Node", @@ -440,12 +493,12 @@ func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext * err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message()) } } - if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil { + if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) } if err != nil { klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name) - if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil { + if err := sched.SchedulerCache.ForgetPod(assumed); err != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", err) } return err @@ -455,21 +508,21 @@ func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext * metrics.DeprecatedBindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart)) - sched.config.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) + sched.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) return nil } // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne() { - fwk := sched.config.Framework + fwk := sched.Framework - pod := sched.config.NextPod() + pod := sched.NextPod() // pod could be nil when schedulerQueue is closed if pod == nil { return } if pod.DeletionTimestamp != nil { - sched.config.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) + sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return } @@ -486,7 +539,7 @@ func (sched *Scheduler) scheduleOne() { // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. if fitError, ok := err.(*core.FitError); ok { - if sched.config.DisablePreemption { + if sched.DisablePreemption { klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." + " No preemption is performed.") } else { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index aa5c166eb20..f7ae30d501e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -767,7 +767,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) - s.config.VolumeBinder = fakeVolumeBinder + s.VolumeBinder = fakeVolumeBinder return s, bindingChan, errChan } diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 718b5858d19..aa78b738c58 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -276,15 +276,13 @@ priorities: [] t.Fatalf("couldn't make scheduler config: %v", err) } - config := sched.Config() - // Verify that the config is applied correctly. schedPredicates := sets.NewString() - for k := range config.Algorithm.Predicates() { + for k := range sched.Algorithm.Predicates() { schedPredicates.Insert(k) } schedPrioritizers := sets.NewString() - for _, p := range config.Algorithm.Prioritizers() { + for _, p := range sched.Algorithm.Prioritizers() { schedPrioritizers.Insert(p.Name) } if !schedPredicates.Equal(test.expectedPredicates) { diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index cd6fdaadc1e..41dea8a7441 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -204,6 +204,13 @@ func initTestSchedulerWithOptions( // set DisablePreemption option context.schedulerConfig.DisablePreemption = disablePreemption + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ + Interface: context.clientSet.EventsV1beta1().Events(""), + }) + context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder( + legacyscheme.Scheme, + v1.DefaultSchedulerName, + ) context.scheduler = scheduler.NewFromConfig(context.schedulerConfig) @@ -224,13 +231,6 @@ func initTestSchedulerWithOptions( controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) } - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: context.clientSet.EventsV1beta1().Events(""), - }) - context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder( - legacyscheme.Scheme, - v1.DefaultSchedulerName, - ) stopCh := make(chan struct{}) eventBroadcaster.StartRecordingToSink(stopCh) @@ -727,7 +727,7 @@ func waitForPDBsStable(context *testContext, pdbs []*policy.PodDisruptionBudget, // waitCachedPodsStable waits until scheduler cache has the given pods. func waitCachedPodsStable(context *testContext, pods []*v1.Pod) error { return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { - cachedPods, err := context.scheduler.Config().SchedulerCache.List(labels.Everything()) + cachedPods, err := context.scheduler.SchedulerCache.List(labels.Everything()) if err != nil { return false, err } @@ -739,7 +739,7 @@ func waitCachedPodsStable(context *testContext, pods []*v1.Pod) error { if err1 != nil { return false, err1 } - cachedPod, err2 := context.scheduler.Config().SchedulerCache.GetPod(actualPod) + cachedPod, err2 := context.scheduler.SchedulerCache.GetPod(actualPod) if err2 != nil || cachedPod == nil { return false, err2 } diff --git a/test/integration/volumescheduling/util.go b/test/integration/volumescheduling/util.go index f7d59b7a2b6..01c9e17206c 100644 --- a/test/integration/volumescheduling/util.go +++ b/test/integration/volumescheduling/util.go @@ -123,6 +123,13 @@ func initTestSchedulerWithOptions( // set DisablePreemption option context.schedulerConfig.DisablePreemption = false + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ + Interface: context.clientSet.EventsV1beta1().Events(""), + }) + context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder( + legacyscheme.Scheme, + v1.DefaultSchedulerName, + ) context.scheduler = scheduler.NewFromConfig(context.schedulerConfig) @@ -137,13 +144,6 @@ func initTestSchedulerWithOptions( context.informerFactory.Storage().V1beta1().CSINodes(), ) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: context.clientSet.EventsV1beta1().Events(""), - }) - context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder( - legacyscheme.Scheme, - v1.DefaultSchedulerName, - ) stopCh := make(chan struct{}) eventBroadcaster.StartRecordingToSink(stopCh)