diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ab487f1bb2d..483e4e7bbc9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -56,20 +56,12 @@ const ( pluginMetricsSamplePercent = 10 ) -// podConditionUpdater updates the condition of a pod based on the passed -// PodCondition -// TODO (ahmad-diaa): Remove type and replace it with scheduler methods -type podConditionUpdater interface { - update(pod *v1.Pod, podCondition *v1.PodCondition) error -} - // PodPreemptor has methods needed to delete a pod and to update 'NominatedPod' // field of the preemptor pod. // TODO (ahmad-diaa): Remove type and replace it with scheduler methods type podPreemptor interface { getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) deletePod(pod *v1.Pod) error - setNominatedNodeName(pod *v1.Pod, nominatedNode string) error removeNominatedNodeName(pod *v1.Pod) error } @@ -81,10 +73,6 @@ type Scheduler struct { SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm - // PodConditionUpdater is used only in case of scheduling errors. If we succeed - // with scheduling, PodScheduled condition will be updated in apiserver in /bind - // handler so that binding and setting PodCondition it is atomic. - podConditionUpdater podConditionUpdater // PodPreemptor is used to evict pods and update 'NominatedNode' field of // the preemptor pod. podPreemptor podPreemptor @@ -112,6 +100,8 @@ type Scheduler struct { Profiles profile.Map scheduledPodsHasSynced func() bool + + client clientset.Interface } // Cache returns the cache in scheduler for test to check the data in scheduler. @@ -312,7 +302,7 @@ func New(client clientset.Interface, // Additional tweaks to the config produced by the configurator. sched.DisablePreemption = options.disablePreemption sched.StopEverything = stopEverything - sched.podConditionUpdater = &podConditionUpdaterImpl{client} + sched.client = client sched.podPreemptor = &podPreemptorImpl{client} sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced @@ -366,23 +356,46 @@ func (sched *Scheduler) Run(ctx context.Context) { sched.SchedulingQueue.Close() } -// recordFailedSchedulingEvent records an event for the pod that indicates the -// pod has failed to schedule. -// NOTE: This function modifies "pod". "pod" should be copied before being passed. -func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, message string) { +// recordSchedulingFailure records an event for the pod that indicates the +// pod has failed to schedule. Also, update the pod condition and nominated node name if set. +func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) { sched.Error(podInfo, err) + + // Update the scheduling queue with the nominated pod information. Without + // this, there would be a race condition between the next scheduling cycle + // and the time the scheduler receives a Pod Update for the nominated pod. + // Here we check for nil only for tests. + if sched.SchedulingQueue != nil { + sched.SchedulingQueue.AddNominatedPod(podInfo.Pod, nominatedNode) + } + pod := podInfo.Pod - prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) - if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{ + prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error()) + if err := updatePod(sched.client, pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: reason, Message: err.Error(), - }); err != nil { - klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err) + }, nominatedNode); err != nil { + klog.Errorf("Error updating pod %s/%s: %v", pod.Namespace, pod.Name, err) } } +func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error { + klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason) + podCopy := pod.DeepCopy() + // NominatedNodeName is updated only if we are trying to set it, and the value is + // different from the existing one. + if !podutil.UpdatePodCondition(&podCopy.Status, condition) && + (len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) { + return nil + } + if nominatedNode != "" { + podCopy.Status.NominatedNodeName = nominatedNode + } + return patchPod(client, pod, podCopy) +} + // preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible. // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. // It returns the node name and an error if any. @@ -399,19 +412,6 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat return "", err } if len(nodeName) != 0 { - // Update the scheduling queue with the nominated pod information. Without - // this, there would be a race condition between the next scheduling cycle - // and the time the scheduler receives a Pod Update for the nominated pod. - sched.SchedulingQueue.AddNominatedPod(preemptor, nodeName) - - // Make a call to update nominated node name of the pod on the API server. - err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName) - if err != nil { - klog.Errorf("Error in preemption process. Cannot set 'NominatedNodeName' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) - sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor) - return "", err - } - for _, victim := range victims { if err := sched.podPreemptor.deletePod(victim); err != nil { klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) @@ -549,13 +549,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. + nominatedNode := "" if fitError, ok := err.(*core.FitError); ok { if sched.DisablePreemption { klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." + " No preemption is performed.") } else { preemptionStartTime := time.Now() - sched.preempt(schedulingCycleCtx, prof, state, pod, fitError) + nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) @@ -571,7 +572,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.Errorf("error selecting node for pod: %v", err) metrics.PodScheduleErrors.Inc() } - sched.recordSchedulingFailure(prof, podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error()) + sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) @@ -582,7 +583,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // Run "reserve" plugins. if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { - sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) + sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") metrics.PodScheduleErrors.Inc() return } @@ -595,7 +596,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). - sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err)) + sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "") metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) @@ -618,7 +619,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } // One of the plugins returned status different than success or wait. prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message()) + sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "") return } @@ -644,7 +645,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message()) + sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") return } @@ -659,7 +660,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) + sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, "") return } @@ -669,7 +670,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) + sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "") } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. if klog.V(2).Enabled() { @@ -713,31 +714,6 @@ func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool return false } -type podConditionUpdaterImpl struct { - Client clientset.Interface -} - -func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error { - klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason) - oldData, err := json.Marshal(pod) - if err != nil { - return err - } - if !podutil.UpdatePodCondition(&pod.Status, condition) { - return nil - } - newData, err := json.Marshal(pod) - if err != nil { - return err - } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) - if err != nil { - return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err) - } - _, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") - return err -} - type podPreemptorImpl struct { Client clientset.Interface } @@ -750,36 +726,33 @@ func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error { return p.Client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) } -func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { - klog.V(3).Infof("Setting nominated node name for %s/%s to \"%s\"", pod.Namespace, pod.Name, nominatedNodeName) - if pod.Status.NominatedNodeName == nominatedNodeName { +func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error { + if len(pod.Status.NominatedNodeName) == 0 { return nil } podCopy := pod.DeepCopy() - oldData, err := json.Marshal(podCopy) + podCopy.Status.NominatedNodeName = "" + return patchPod(p.Client, pod, podCopy) +} + +func patchPod(client clientset.Interface, old *v1.Pod, new *v1.Pod) error { + oldData, err := json.Marshal(old) if err != nil { return err } - podCopy.Status.NominatedNodeName = nominatedNodeName - newData, err := json.Marshal(podCopy) + + newData, err := json.Marshal(new) if err != nil { return err } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) if err != nil { - return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err) + return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err) } - _, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + _, err = client.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") return err } -func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error { - if len(pod.Status.NominatedNodeName) == 0 { - return nil - } - return p.setNominatedNodeName(pod, "") -} - func defaultAlgorithmSourceProviderName() *string { provider := schedulerapi.SchedulerDefaultProviderName return &provider diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index c8a30ada825..529f97bfdb7 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -65,12 +65,6 @@ import ( st "k8s.io/kubernetes/pkg/scheduler/testing" ) -type fakePodConditionUpdater struct{} - -func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondition) error { - return nil -} - type fakePodPreemptor struct{} func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { @@ -81,10 +75,6 @@ func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error { return nil } -func (fp fakePodPreemptor) setNominatedNodeName(pod *v1.Pod, nomNodeName string) error { - return nil -} - func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error { return nil } @@ -277,7 +267,7 @@ func TestSchedulerScheduleOne(t *testing.T) { expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}, expectAssumedPod: podWithID("foo", testNode.Name), injectBindError: errB, - expectError: errors.New("plugin \"DefaultBinder\" failed to bind pod \"/foo\": binder"), + expectError: errors.New("Binding rejected: plugin \"DefaultBinder\" failed to bind pod \"/foo\": binder"), expectErrorPod: podWithID("foo", testNode.Name), expectForgetPod: podWithID("foo", testNode.Name), eventReason: "FailedScheduling", @@ -334,9 +324,9 @@ func TestSchedulerScheduleOne(t *testing.T) { } s := &Scheduler{ - SchedulerCache: sCache, - Algorithm: item.algo, - podConditionUpdater: fakePodConditionUpdater{}, + SchedulerCache: sCache, + Algorithm: item.algo, + client: client, Error: func(p *framework.QueuedPodInfo, err error) { gotPod = p.Pod gotError = err @@ -828,9 +818,9 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Error: func(p *framework.QueuedPodInfo, err error) { errChan <- err }, - Profiles: profiles, - podConditionUpdater: fakePodConditionUpdater{}, - podPreemptor: fakePodPreemptor{}, + Profiles: profiles, + client: client, + podPreemptor: fakePodPreemptor{}, } return sched, bindingChan, errChan @@ -1316,7 +1306,7 @@ func TestInjectingPluginConfigForVolumeBinding(t *testing.T) { } } -func TestSetNominatedNodeName(t *testing.T) { +func TestRemoveNominatedNodeName(t *testing.T) { tests := []struct { name string currentNominatedNodeName string @@ -1324,30 +1314,15 @@ func TestSetNominatedNodeName(t *testing.T) { expectedPatchRequests int expectedPatchData string }{ - { - name: "Should make patch request to set node name", - currentNominatedNodeName: "", - newNominatedNodeName: "node1", - expectedPatchRequests: 1, - expectedPatchData: `{"status":{"nominatedNodeName":"node1"}}`, - }, { name: "Should make patch request to clear node name", currentNominatedNodeName: "node1", - newNominatedNodeName: "", expectedPatchRequests: 1, expectedPatchData: `{"status":{"nominatedNodeName":null}}`, }, - { - name: "Should not make patch request if nominated node is already set to the specified value", - currentNominatedNodeName: "node1", - newNominatedNodeName: "node1", - expectedPatchRequests: 0, - }, { name: "Should not make patch request if nominated node is already cleared", currentNominatedNodeName: "", - newNominatedNodeName: "", expectedPatchRequests: 0, }, } @@ -1371,8 +1346,8 @@ func TestSetNominatedNodeName(t *testing.T) { } preemptor := &podPreemptorImpl{Client: cs} - if err := preemptor.setNominatedNodeName(pod, test.newNominatedNodeName); err != nil { - t.Fatalf("Error calling setNominatedNodeName: %v", err) + if err := preemptor.removeNominatedNodeName(pod); err != nil { + t.Fatalf("Error calling removeNominatedNodeName: %v", err) } if actualPatchRequests != test.expectedPatchRequests { @@ -1386,11 +1361,13 @@ func TestSetNominatedNodeName(t *testing.T) { } } -func TestUpdatePodCondition(t *testing.T) { +func TestUpdatePod(t *testing.T) { tests := []struct { name string currentPodConditions []v1.PodCondition newPodCondition *v1.PodCondition + currentNominatedNodeName string + newNominatedNodeName string expectedPatchRequests int expectedPatchDataPattern string }{ @@ -1478,7 +1455,7 @@ func TestUpdatePodCondition(t *testing.T) { expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","message":"newMessage","reason":"newReason","type":"currentType"}]}}`, }, { - name: "Should not make patch request if pod condition already exists and is identical", + name: "Should not make patch request if pod condition already exists and is identical and nominated node name is not set", currentPodConditions: []v1.PodCondition{ { Type: "currentType", @@ -1497,7 +1474,32 @@ func TestUpdatePodCondition(t *testing.T) { Reason: "currentReason", Message: "currentMessage", }, - expectedPatchRequests: 0, + currentNominatedNodeName: "node1", + expectedPatchRequests: 0, + }, + { + name: "Should make patch request if pod condition already exists and is identical but nominated node name is set and different", + currentPodConditions: []v1.PodCondition{ + { + Type: "currentType", + Status: "currentStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)), + Reason: "currentReason", + Message: "currentMessage", + }, + }, + newPodCondition: &v1.PodCondition{ + Type: "currentType", + Status: "currentStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)), + Reason: "currentReason", + Message: "currentMessage", + }, + newNominatedNodeName: "node1", + expectedPatchRequests: 1, + expectedPatchDataPattern: `{"status":{"nominatedNodeName":"node1"}}`, }, } for _, test := range tests { @@ -1516,16 +1518,18 @@ func TestUpdatePodCondition(t *testing.T) { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Status: v1.PodStatus{Conditions: test.currentPodConditions}, + Status: v1.PodStatus{ + Conditions: test.currentPodConditions, + NominatedNodeName: test.currentNominatedNodeName, + }, } - updater := &podConditionUpdaterImpl{Client: cs} - if err := updater.update(pod, test.newPodCondition); err != nil { + if err := updatePod(cs, pod, test.newPodCondition, test.newNominatedNodeName); err != nil { t.Fatalf("Error calling update: %v", err) } if actualPatchRequests != test.expectedPatchRequests { - t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests) + t.Fatalf("Actual patch requests (%d) does not equal expected patch requests (%d), actual patch data: %v", actualPatchRequests, test.expectedPatchRequests, actualPatchData) } regex, err := regexp.Compile(test.expectedPatchDataPattern)