mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
merge MakeDefaultErrorFunc into handleSchedulingFailure
This commit is contained in:
parent
71481bf247
commit
4f77732540
@ -301,7 +301,6 @@ func TestSchedulerWithExtenders(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
emptySnapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
podIgnored := &v1.Pod{}
|
||||
|
@ -138,7 +138,7 @@ type Evaluator struct {
|
||||
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
|
||||
// 0) Fetch the latest version of <pod>.
|
||||
// It's safe to directly fetch pod here. Because the informer cache has already been
|
||||
// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().
|
||||
// initialized when creating the Scheduler obj.
|
||||
// However, tests may need to manually initialize the shared pod informer.
|
||||
podNamespace, podName := pod.Namespace, pod.Name
|
||||
pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@ -129,7 +130,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
|
||||
metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
|
||||
}
|
||||
sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
|
||||
sched.FailureHandler(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo)
|
||||
return
|
||||
}
|
||||
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
|
||||
@ -146,7 +147,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
// This relies on the fact that Error will check if the pod has been bound
|
||||
// to a node and if so will not add it back to the unscheduled pods queue
|
||||
// (otherwise this would cause an infinite loop).
|
||||
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
|
||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, err, SchedulerError, clearNominatedNode)
|
||||
return
|
||||
}
|
||||
|
||||
@ -158,7 +159,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
|
||||
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
|
||||
}
|
||||
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
|
||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)
|
||||
return
|
||||
}
|
||||
|
||||
@ -178,7 +179,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {
|
||||
klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")
|
||||
}
|
||||
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
|
||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode)
|
||||
return
|
||||
}
|
||||
|
||||
@ -221,7 +222,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
return assumedPod.UID != pod.UID
|
||||
})
|
||||
}
|
||||
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
|
||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode)
|
||||
return
|
||||
}
|
||||
|
||||
@ -239,7 +240,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
// TODO(#103853): de-duplicate the logic.
|
||||
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
|
||||
}
|
||||
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
|
||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode)
|
||||
return
|
||||
}
|
||||
|
||||
@ -256,7 +257,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
// TODO(#103853): de-duplicate the logic.
|
||||
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)
|
||||
}
|
||||
sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
|
||||
sched.FailureHandler(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode)
|
||||
return
|
||||
}
|
||||
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
|
||||
@ -810,7 +811,49 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string {
|
||||
// handleSchedulingFailure records an event for the pod that indicates the
|
||||
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
|
||||
func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo) {
|
||||
sched.Error(podInfo, err)
|
||||
pod := podInfo.Pod
|
||||
if err == ErrNoNodesAvailable {
|
||||
klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
|
||||
} else if fitError, ok := err.(*framework.FitError); ok {
|
||||
// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
|
||||
podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
|
||||
klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
|
||||
} else if apierrors.IsNotFound(err) {
|
||||
klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
|
||||
if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
|
||||
nodeName := errStatus.Status().Details.Name
|
||||
// when node is not found, We do not remove the node right away. Trying again to get
|
||||
// the node and if the node is still not found, then remove it from the scheduler cache.
|
||||
_, err := fwk.ClientSet().CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
||||
if err != nil && apierrors.IsNotFound(err) {
|
||||
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
|
||||
if err := sched.Cache.RemoveNode(&node); err != nil {
|
||||
klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
|
||||
}
|
||||
|
||||
// Check if the Pod exists in informer cache.
|
||||
podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister()
|
||||
cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name)
|
||||
if e != nil {
|
||||
klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e)
|
||||
} else {
|
||||
// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
|
||||
// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
|
||||
if len(cachedPod.Spec.NodeName) != 0 {
|
||||
klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
|
||||
} else {
|
||||
// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
|
||||
podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())
|
||||
if err := sched.SchedulingQueue.AddUnschedulableIfNotPresent(podInfo, sched.SchedulingQueue.SchedulingCycle()); err != nil {
|
||||
klog.ErrorS(err, "Error occurred")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the scheduling queue with the nominated pod information. Without
|
||||
// this, there would be a race condition between the next scheduling cycle
|
||||
@ -820,7 +863,11 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
|
||||
sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatingInfo)
|
||||
}
|
||||
|
||||
pod := podInfo.Pod
|
||||
if err == nil {
|
||||
// Only tests can reach here.
|
||||
return
|
||||
}
|
||||
|
||||
msg := truncateMessage(err.Error())
|
||||
fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
|
||||
if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
|
||||
|
@ -590,10 +590,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
func() *framework.QueuedPodInfo {
|
||||
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(item.sendPod)}
|
||||
},
|
||||
func(p *framework.QueuedPodInfo, err error) {
|
||||
gotPod = p.Pod
|
||||
gotError = err
|
||||
},
|
||||
nil,
|
||||
internalqueue.NewTestQueue(ctx, nil),
|
||||
profile.Map{
|
||||
@ -605,6 +601,13 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
s.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
||||
return item.mockResult.result, item.mockResult.err
|
||||
}
|
||||
s.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) {
|
||||
gotPod = p.Pod
|
||||
gotError = err
|
||||
|
||||
msg := truncateMessage(err.Error())
|
||||
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
|
||||
}
|
||||
called := make(chan struct{})
|
||||
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
|
||||
e, _ := obj.(*eventsv1.Event)
|
||||
@ -2026,7 +2029,6 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
snapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
informerFactory.Start(ctx.Done())
|
||||
@ -2328,7 +2330,6 @@ func TestZeroRequest(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
snapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
|
||||
@ -2512,7 +2513,6 @@ func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
snapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
|
||||
@ -2574,7 +2574,6 @@ func makeScheduler(nodes []*v1.Node) *Scheduler {
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
emptySnapshot,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
cache.UpdateSnapshot(s.nodeInfoSnapshot)
|
||||
@ -2671,9 +2670,6 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c
|
||||
func() *framework.QueuedPodInfo {
|
||||
return &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(clientcache.Pop(queuedPodStore).(*v1.Pod))}
|
||||
},
|
||||
func(p *framework.QueuedPodInfo, err error) {
|
||||
errChan <- err
|
||||
},
|
||||
nil,
|
||||
schedulingQueue,
|
||||
profile.Map{
|
||||
@ -2682,6 +2678,12 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, c
|
||||
client,
|
||||
internalcache.NewEmptySnapshot(),
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, err error, _ string, _ *framework.NominatingInfo) {
|
||||
errChan <- err
|
||||
|
||||
msg := truncateMessage(err.Error())
|
||||
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
|
||||
}
|
||||
return sched, bindingChan, errChan
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@ -31,7 +30,6 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
@ -74,9 +72,8 @@ type Scheduler struct {
|
||||
// stale while they sit in a channel.
|
||||
NextPod func() *framework.QueuedPodInfo
|
||||
|
||||
// Error is called if there is an error. It is passed the pod in
|
||||
// question, and the error
|
||||
Error func(*framework.QueuedPodInfo, error)
|
||||
// FailureHandler is called upon a scheduling failure.
|
||||
FailureHandler FailureHandlerFn
|
||||
|
||||
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
|
||||
// Return a struct of ScheduleResult with the name of suggested host on success,
|
||||
@ -318,7 +315,6 @@ func New(client clientset.Interface,
|
||||
schedulerCache,
|
||||
extenders,
|
||||
internalqueue.MakeNextPodFunc(podQueue),
|
||||
MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),
|
||||
stopEverything,
|
||||
podQueue,
|
||||
profiles,
|
||||
@ -348,56 +344,6 @@ func (sched *Scheduler) Run(ctx context.Context) {
|
||||
sched.SchedulingQueue.Close()
|
||||
}
|
||||
|
||||
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
||||
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
|
||||
return func(podInfo *framework.QueuedPodInfo, err error) {
|
||||
pod := podInfo.Pod
|
||||
if err == ErrNoNodesAvailable {
|
||||
klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
|
||||
} else if fitError, ok := err.(*framework.FitError); ok {
|
||||
// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
|
||||
podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
|
||||
klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
|
||||
} else if apierrors.IsNotFound(err) {
|
||||
klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
|
||||
if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
|
||||
nodeName := errStatus.Status().Details.Name
|
||||
// when node is not found, We do not remove the node right away. Trying again to get
|
||||
// the node and if the node is still not found, then remove it from the scheduler cache.
|
||||
_, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
||||
if err != nil && apierrors.IsNotFound(err) {
|
||||
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
|
||||
if err := schedulerCache.RemoveNode(&node); err != nil {
|
||||
klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
|
||||
}
|
||||
|
||||
// Check if the Pod exists in informer cache.
|
||||
cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
|
||||
if err != nil {
|
||||
klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
|
||||
// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
|
||||
if len(cachedPod.Spec.NodeName) != 0 {
|
||||
klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
|
||||
return
|
||||
}
|
||||
|
||||
// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
|
||||
podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())
|
||||
if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
|
||||
klog.ErrorS(err, "Error occurred")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific
|
||||
// in-place podInformer.
|
||||
func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
|
||||
@ -464,12 +410,13 @@ func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.K
|
||||
return fExtenders, nil
|
||||
}
|
||||
|
||||
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatingInfo *framework.NominatingInfo)
|
||||
|
||||
// newScheduler creates a Scheduler object.
|
||||
func newScheduler(
|
||||
cache internalcache.Cache,
|
||||
extenders []framework.Extender,
|
||||
nextPod func() *framework.QueuedPodInfo,
|
||||
Error func(*framework.QueuedPodInfo, error),
|
||||
stopEverything <-chan struct{},
|
||||
schedulingQueue internalqueue.SchedulingQueue,
|
||||
profiles profile.Map,
|
||||
@ -480,7 +427,6 @@ func newScheduler(
|
||||
Cache: cache,
|
||||
Extenders: extenders,
|
||||
NextPod: nextPod,
|
||||
Error: Error,
|
||||
StopEverything: stopEverything,
|
||||
SchedulingQueue: schedulingQueue,
|
||||
Profiles: profiles,
|
||||
@ -489,6 +435,7 @@ func newScheduler(
|
||||
percentageOfNodesToScore: percentageOfNodesToScore,
|
||||
}
|
||||
sched.SchedulePod = sched.schedulePod
|
||||
sched.FailureHandler = sched.handleSchedulingFailure
|
||||
return &sched
|
||||
}
|
||||
|
||||
|
@ -30,12 +30,15 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
@ -226,7 +229,7 @@ func TestSchedulerCreation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultErrorFunc(t *testing.T) {
|
||||
func TestFailureHandler(t *testing.T) {
|
||||
testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj()
|
||||
testPodUpdated := testPod.DeepCopy()
|
||||
testPodUpdated.Labels = map[string]string{"foo": ""}
|
||||
@ -259,8 +262,8 @@ func TestDefaultErrorFunc(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
@ -269,7 +272,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
||||
podInformer.Informer().GetStore().Add(testPod)
|
||||
|
||||
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
|
||||
schedulerCache := internalcache.New(30*time.Second, stopCh)
|
||||
schedulerCache := internalcache.New(30*time.Second, ctx.Done())
|
||||
|
||||
queue.Add(testPod)
|
||||
queue.Pop()
|
||||
@ -283,9 +286,13 @@ func TestDefaultErrorFunc(t *testing.T) {
|
||||
queue.Delete(testPod)
|
||||
}
|
||||
|
||||
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)}
|
||||
errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache)
|
||||
errFunc(testPodInfo, tt.injectErr)
|
||||
s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil)
|
||||
|
||||
var got *v1.Pod
|
||||
if tt.podUpdatedDuringScheduling {
|
||||
@ -305,7 +312,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultErrorFunc_NodeNotFound(t *testing.T) {
|
||||
func TestFailureHandler_NodeNotFound(t *testing.T) {
|
||||
nodeFoo := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
nodeBar := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
||||
testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj()
|
||||
@ -354,9 +361,13 @@ func TestDefaultErrorFunc_NodeNotFound(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)}
|
||||
errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache)
|
||||
errFunc(testPodInfo, tt.injectErr)
|
||||
s.FailureHandler(ctx, fwk, testPodInfo, tt.injectErr, v1.PodReasonUnschedulable, nil)
|
||||
|
||||
gotNodes := schedulerCache.Dump().Nodes
|
||||
gotNodeNames := sets.NewString()
|
||||
@ -370,9 +381,9 @@ func TestDefaultErrorFunc_NodeNotFound(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultErrorFunc_PodAlreadyBound(t *testing.T) {
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
func TestFailureHandler_PodAlreadyBound(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
nodeFoo := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Node("foo").Obj()
|
||||
@ -384,14 +395,18 @@ func TestDefaultErrorFunc_PodAlreadyBound(t *testing.T) {
|
||||
podInformer.Informer().GetStore().Add(testPod)
|
||||
|
||||
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
|
||||
schedulerCache := internalcache.New(30*time.Second, stopCh)
|
||||
schedulerCache := internalcache.New(30*time.Second, ctx.Done())
|
||||
|
||||
// Add node to schedulerCache no matter it's deleted in API server or not.
|
||||
schedulerCache.AddNode(&nodeFoo)
|
||||
|
||||
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testPodInfo := &framework.QueuedPodInfo{PodInfo: framework.NewPodInfo(testPod)}
|
||||
errFunc := MakeDefaultErrorFunc(client, podInformer.Lister(), queue, schedulerCache)
|
||||
errFunc(testPodInfo, fmt.Errorf("binding rejected: timeout"))
|
||||
s.FailureHandler(ctx, fwk, testPodInfo, fmt.Errorf("binding rejected: timeout"), v1.PodReasonUnschedulable, nil)
|
||||
|
||||
pod := getPodFromPriorityQueue(queue, testPod)
|
||||
if pod != nil {
|
||||
@ -425,3 +440,35 @@ func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue internalqueue.SchedulingQueue,
|
||||
client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) {
|
||||
registerPluginFuncs := []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
}
|
||||
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
||||
fwk, err := st.NewFramework(registerPluginFuncs,
|
||||
testSchedulerName,
|
||||
frameworkruntime.WithClientSet(client),
|
||||
frameworkruntime.WithInformerFactory(informerFactory),
|
||||
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
s := newScheduler(
|
||||
cache,
|
||||
nil,
|
||||
nil,
|
||||
stop,
|
||||
queue,
|
||||
profile.Map{testSchedulerName: fwk},
|
||||
client,
|
||||
nil,
|
||||
0,
|
||||
)
|
||||
|
||||
return s, fwk, nil
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
|
||||
if fitError == nil {
|
||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||
}
|
||||
testCtx.Scheduler.Error(podInfo, fitError)
|
||||
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil)
|
||||
}
|
||||
|
||||
// Trigger a NodeTaintChange event.
|
||||
@ -280,7 +280,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
|
||||
if fitError == nil {
|
||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||
}
|
||||
testCtx.Scheduler.Error(podInfo, fitError)
|
||||
testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, fitError, v1.PodReasonUnschedulable, nil)
|
||||
|
||||
// Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so
|
||||
// pass a number larger than 1 to move Pod to unschedulablePods.
|
||||
|
Loading…
Reference in New Issue
Block a user