use factory.Config fields directly in scheduler struct

This commit is contained in:
Ahmad Diaa 2019-08-09 03:40:44 +02:00
parent 7420bb2214
commit 61ab77ef7a
7 changed files with 143 additions and 90 deletions

View File

@ -17,7 +17,9 @@ go_library(
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",

View File

@ -41,7 +41,7 @@ func (sched *Scheduler) onPvAdd(obj interface{}) {
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
@ -49,15 +49,15 @@ func (sched *Scheduler) onPvUpdate(old, new interface{}) {
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onPvcAdd(obj interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
@ -74,20 +74,20 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
}
func (sched *Scheduler) onServiceAdd(obj interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onServiceDelete(obj interface{}) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
@ -97,11 +97,11 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
return
}
if err := sched.config.SchedulerCache.AddNode(node); err != nil {
if err := sched.SchedulerCache.AddNode(node); err != nil {
klog.Errorf("scheduler cache AddNode failed: %v", err)
}
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
@ -116,7 +116,7 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
return
}
if err := sched.config.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
if err := sched.SchedulerCache.UpdateNode(oldNode, newNode); err != nil {
klog.Errorf("scheduler cache UpdateNode failed: %v", err)
}
@ -125,8 +125,8 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
// to save processing cycles. We still trigger a move to active queue to cover the case
// that a pod being processed by the scheduler is determined unschedulable. We want this
// pod to be reevaluated when a change in the cluster happens.
if sched.config.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
sched.config.SchedulingQueue.MoveAllToActiveQueue()
if sched.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
sched.SchedulingQueue.MoveAllToActiveQueue()
}
}
@ -151,7 +151,7 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.RemoveNode(node); err != nil {
if err := sched.SchedulerCache.RemoveNode(node); err != nil {
klog.Errorf("scheduler cache RemoveNode failed: %v", err)
}
}
@ -163,11 +163,11 @@ func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
return
}
if err := sched.config.SchedulerCache.AddCSINode(csiNode); err != nil {
if err := sched.SchedulerCache.AddCSINode(csiNode); err != nil {
klog.Errorf("scheduler cache AddCSINode failed: %v", err)
}
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
@ -183,11 +183,11 @@ func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
return
}
if err := sched.config.SchedulerCache.UpdateCSINode(oldCSINode, newCSINode); err != nil {
if err := sched.SchedulerCache.UpdateCSINode(oldCSINode, newCSINode); err != nil {
klog.Errorf("scheduler cache UpdateCSINode failed: %v", err)
}
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
func (sched *Scheduler) onCSINodeDelete(obj interface{}) {
@ -207,13 +207,13 @@ func (sched *Scheduler) onCSINodeDelete(obj interface{}) {
return
}
if err := sched.config.SchedulerCache.RemoveCSINode(csiNode); err != nil {
if err := sched.SchedulerCache.RemoveCSINode(csiNode); err != nil {
klog.Errorf("scheduler cache RemoveCSINode failed: %v", err)
}
}
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
if err := sched.config.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
if err := sched.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
}
}
@ -223,7 +223,7 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
if sched.skipPodUpdate(pod) {
return
}
if err := sched.config.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
if err := sched.SchedulingQueue.Update(oldObj.(*v1.Pod), pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
}
}
@ -244,12 +244,12 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
return
}
if err := sched.config.SchedulingQueue.Delete(pod); err != nil {
if err := sched.SchedulingQueue.Delete(pod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
}
if sched.config.VolumeBinder != nil {
if sched.VolumeBinder != nil {
// Volume binder only wants to keep unassigned pods
sched.config.VolumeBinder.DeletePodBindings(pod)
sched.VolumeBinder.DeletePodBindings(pod)
}
}
@ -260,11 +260,11 @@ func (sched *Scheduler) addPodToCache(obj interface{}) {
return
}
if err := sched.config.SchedulerCache.AddPod(pod); err != nil {
if err := sched.SchedulerCache.AddPod(pod); err != nil {
klog.Errorf("scheduler cache AddPod failed: %v", err)
}
sched.config.SchedulingQueue.AssignedPodAdded(pod)
sched.SchedulingQueue.AssignedPodAdded(pod)
}
func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
@ -284,11 +284,11 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
if err := sched.SchedulerCache.UpdatePod(oldPod, newPod); err != nil {
klog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
sched.config.SchedulingQueue.AssignedPodUpdated(newPod)
sched.SchedulingQueue.AssignedPodUpdated(newPod)
}
func (sched *Scheduler) deletePodFromCache(obj interface{}) {
@ -312,11 +312,11 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.RemovePod(pod); err != nil {
if err := sched.SchedulerCache.RemovePod(pod); err != nil {
klog.Errorf("scheduler cache RemovePod failed: %v", err)
}
sched.config.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveQueue()
}
// assignedPod selects pods that are assigned (scheduled and running).
@ -336,7 +336,7 @@ func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
// updated.
func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
// Non-assumed pods should never be skipped.
isAssumed, err := sched.config.SchedulerCache.IsAssumedPod(pod)
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
@ -346,7 +346,7 @@ func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
}
// Gets the assumed pod from the cache.
assumedPod, err := sched.config.SchedulerCache.GetPod(pod)
assumedPod, err := sched.SchedulerCache.GetPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
return false

View File

@ -42,7 +42,9 @@ import (
"k8s.io/kubernetes/pkg/scheduler/factory"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
const (
@ -55,12 +57,55 @@ const (
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
config *factory.Config
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) factory.Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater factory.PodConditionUpdater
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
PodPreemptor factory.PodPreemptor
// Framework runs scheduler plugins at configured extension points.
Framework framework.Framework
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
// Recorder is the EventRecorder to use
Recorder events.EventRecorder
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
// Disable pod preemption or not.
DisablePreemption bool
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
}
// Cache returns the cache in scheduler for test to check the data in scheduler.
func (sched *Scheduler) Cache() internalcache.Cache {
return sched.config.SchedulerCache
return sched.SchedulerCache
}
type schedulerOptions struct {
@ -247,31 +292,39 @@ func initPolicyFromConfigMap(client clientset.Interface, policyRef *kubeschedule
func NewFromConfig(config *factory.Config) *Scheduler {
metrics.Register()
return &Scheduler{
config: config,
SchedulerCache: config.SchedulerCache,
Algorithm: config.Algorithm,
GetBinder: config.GetBinder,
PodConditionUpdater: config.PodConditionUpdater,
PodPreemptor: config.PodPreemptor,
Framework: config.Framework,
NextPod: config.NextPod,
WaitForCacheSync: config.WaitForCacheSync,
Error: config.Error,
Recorder: config.Recorder,
StopEverything: config.StopEverything,
VolumeBinder: config.VolumeBinder,
DisablePreemption: config.DisablePreemption,
SchedulingQueue: config.SchedulingQueue,
}
}
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
if !sched.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
// Config returns scheduler's config pointer. It is exposed for testing purposes.
func (sched *Scheduler) Config() *factory.Config {
return sched.config
go wait.Until(sched.scheduleOne, 0, sched.StopEverything)
}
// recordFailedSchedulingEvent records an event for the pod that indicates the
// pod has failed to schedule.
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
sched.config.Error(pod, err)
sched.config.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
if err := sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
sched.Error(pod, err)
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
@ -284,7 +337,7 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s
// schedule implements the scheduling algorithm and returns the suggested result(host,
// evaluated nodes number,feasible nodes number).
func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, pluginContext)
result, err := sched.Algorithm.Schedule(pod, pluginContext)
if err != nil {
pod = pod.DeepCopy()
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
@ -297,13 +350,13 @@ func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginCon
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
// It returns the node name and an error if any.
func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, scheduleErr)
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(preemptor, scheduleErr)
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
return "", err
@ -314,18 +367,18 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// Make a call to update nominated node name of the pod on the API server.
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
err = sched.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
if err := sched.PodPreemptor.DeletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
@ -333,7 +386,7 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche
if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted")
}
sched.config.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
metrics.PreemptionVictims.Set(float64(len(victims)))
@ -344,7 +397,7 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche
// function of generic_scheduler.go returns the pod itself for removal of
// the 'NominatedPod' field.
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
rErr := sched.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
// We do not return as this error is not critical.
@ -357,7 +410,7 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche
//
// This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bool, err error) {
allBound, err = sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
allBound, err = sched.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
if err != nil {
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
@ -372,12 +425,12 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo
// retry scheduling.
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
err := sched.VolumeBinder.Binder.BindPodVolumes(assumed)
if err != nil {
klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
// Unassume the Pod and retry scheduling
if forgetErr := sched.config.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
if forgetErr := sched.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
@ -398,7 +451,7 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// immediately.
assumed.Spec.NodeName = host
if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
klog.Errorf("scheduler cache AssumePod failed: %v", err)
// This is most probably result of a BUG in retrying logic.
@ -411,8 +464,8 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
if sched.config.SchedulingQueue != nil {
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
return nil
@ -422,14 +475,14 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// handle binding metrics internally.
func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *framework.PluginContext) error {
bindingStart := time.Now()
bindStatus := sched.config.Framework.RunBindPlugins(pluginContext, assumed, targetNode)
bindStatus := sched.Framework.RunBindPlugins(pluginContext, assumed, targetNode)
var err error
if !bindStatus.IsSuccess() {
if bindStatus.Code() == framework.Skip {
// All bind plugins chose to skip binding of this pod, call original binding function.
// If binding succeeds then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err = sched.config.GetBinder(assumed).Bind(&v1.Binding{
err = sched.GetBinder(assumed).Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID},
Target: v1.ObjectReference{
Kind: "Node",
@ -440,12 +493,12 @@ func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *
err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message())
}
}
if finErr := sched.config.SchedulerCache.FinishBinding(assumed); finErr != nil {
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
}
if err != nil {
klog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
if err := sched.SchedulerCache.ForgetPod(assumed); err != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", err)
}
return err
@ -455,21 +508,21 @@ func (sched *Scheduler) bind(assumed *v1.Pod, targetNode string, pluginContext *
metrics.DeprecatedBindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
sched.config.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
sched.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
return nil
}
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
fwk := sched.config.Framework
fwk := sched.Framework
pod := sched.config.NextPod()
pod := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
return
}
if pod.DeletionTimestamp != nil {
sched.config.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return
}
@ -486,7 +539,7 @@ func (sched *Scheduler) scheduleOne() {
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
if sched.config.DisablePreemption {
if sched.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {

View File

@ -767,7 +767,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
s.config.VolumeBinder = fakeVolumeBinder
s.VolumeBinder = fakeVolumeBinder
return s, bindingChan, errChan
}

View File

@ -276,15 +276,13 @@ priorities: []
t.Fatalf("couldn't make scheduler config: %v", err)
}
config := sched.Config()
// Verify that the config is applied correctly.
schedPredicates := sets.NewString()
for k := range config.Algorithm.Predicates() {
for k := range sched.Algorithm.Predicates() {
schedPredicates.Insert(k)
}
schedPrioritizers := sets.NewString()
for _, p := range config.Algorithm.Prioritizers() {
for _, p := range sched.Algorithm.Prioritizers() {
schedPrioritizers.Insert(p.Name)
}
if !schedPredicates.Equal(test.expectedPredicates) {

View File

@ -204,6 +204,13 @@ func initTestSchedulerWithOptions(
// set DisablePreemption option
context.schedulerConfig.DisablePreemption = disablePreemption
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: context.clientSet.EventsV1beta1().Events(""),
})
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
@ -224,13 +231,6 @@ func initTestSchedulerWithOptions(
controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
}
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: context.clientSet.EventsV1beta1().Events(""),
})
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
stopCh := make(chan struct{})
eventBroadcaster.StartRecordingToSink(stopCh)
@ -727,7 +727,7 @@ func waitForPDBsStable(context *testContext, pdbs []*policy.PodDisruptionBudget,
// waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(context *testContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := context.scheduler.Config().SchedulerCache.List(labels.Everything())
cachedPods, err := context.scheduler.SchedulerCache.List(labels.Everything())
if err != nil {
return false, err
}
@ -739,7 +739,7 @@ func waitCachedPodsStable(context *testContext, pods []*v1.Pod) error {
if err1 != nil {
return false, err1
}
cachedPod, err2 := context.scheduler.Config().SchedulerCache.GetPod(actualPod)
cachedPod, err2 := context.scheduler.SchedulerCache.GetPod(actualPod)
if err2 != nil || cachedPod == nil {
return false, err2
}

View File

@ -123,6 +123,13 @@ func initTestSchedulerWithOptions(
// set DisablePreemption option
context.schedulerConfig.DisablePreemption = false
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: context.clientSet.EventsV1beta1().Events(""),
})
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
@ -137,13 +144,6 @@ func initTestSchedulerWithOptions(
context.informerFactory.Storage().V1beta1().CSINodes(),
)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: context.clientSet.EventsV1beta1().Events(""),
})
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
stopCh := make(chan struct{})
eventBroadcaster.StartRecordingToSink(stopCh)