Merge pull request #91535 from ahg-g/ahg-merge-updates

Merge pod condition update with setting nominated node name
This commit is contained in:
Kubernetes Prow Robot 2020-05-29 08:05:16 -07:00 committed by GitHub
commit 081f97aa2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 124 deletions

View File

@ -56,20 +56,12 @@ const (
pluginMetricsSamplePercent = 10
)
// podConditionUpdater updates the condition of a pod based on the passed
// PodCondition
// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
type podConditionUpdater interface {
update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
// field of the preemptor pod.
// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
type podPreemptor interface {
getUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
deletePod(pod *v1.Pod) error
setNominatedNodeName(pod *v1.Pod, nominatedNode string) error
removeNominatedNodeName(pod *v1.Pod) error
}
@ -81,10 +73,6 @@ type Scheduler struct {
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
podConditionUpdater podConditionUpdater
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
podPreemptor podPreemptor
@ -112,6 +100,8 @@ type Scheduler struct {
Profiles profile.Map
scheduledPodsHasSynced func() bool
client clientset.Interface
}
// Cache returns the cache in scheduler for test to check the data in scheduler.
@ -312,7 +302,7 @@ func New(client clientset.Interface,
// Additional tweaks to the config produced by the configurator.
sched.DisablePreemption = options.disablePreemption
sched.StopEverything = stopEverything
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
sched.client = client
sched.podPreemptor = &podPreemptorImpl{client}
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
@ -366,23 +356,46 @@ func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Close()
}
// recordFailedSchedulingEvent records an event for the pod that indicates the
// pod has failed to schedule.
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, message string) {
// recordSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
sched.Error(podInfo, err)
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
// Here we check for nil only for tests.
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.AddNominatedPod(podInfo.Pod, nominatedNode)
}
pod := podInfo.Pod
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error())
if err := updatePod(sched.client, pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: reason,
Message: err.Error(),
}); err != nil {
klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err)
}, nominatedNode); err != nil {
klog.Errorf("Error updating pod %s/%s: %v", pod.Namespace, pod.Name, err)
}
}
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
podCopy := pod.DeepCopy()
// NominatedNodeName is updated only if we are trying to set it, and the value is
// different from the existing one.
if !podutil.UpdatePodCondition(&podCopy.Status, condition) &&
(len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) {
return nil
}
if nominatedNode != "" {
podCopy.Status.NominatedNodeName = nominatedNode
}
return patchPod(client, pod, podCopy)
}
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
// It returns the node name and an error if any.
@ -399,19 +412,6 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat
return "", err
}
if len(nodeName) != 0 {
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
sched.SchedulingQueue.AddNominatedPod(preemptor, nodeName)
// Make a call to update nominated node name of the pod on the API server.
err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot set 'NominatedNodeName' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
if err := sched.podPreemptor.deletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
@ -549,13 +549,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
nominatedNode := ""
if fitError, ok := err.(*core.FitError); ok {
if sched.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
@ -571,7 +572,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
sched.recordSchedulingFailure(prof, podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
@ -582,7 +583,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// Run "reserve" plugins.
if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
metrics.PodScheduleErrors.Inc()
return
}
@ -595,7 +596,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
@ -618,7 +619,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
}
// One of the plugins returned status different than success or wait.
prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "")
return
}
@ -644,7 +645,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
return
}
@ -659,7 +660,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, "")
return
}
@ -669,7 +670,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
if klog.V(2).Enabled() {
@ -713,31 +714,6 @@ func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool
return false
}
type podConditionUpdaterImpl struct {
Client clientset.Interface
}
func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
oldData, err := json.Marshal(pod)
if err != nil {
return err
}
if !podutil.UpdatePodCondition(&pod.Status, condition) {
return nil
}
newData, err := json.Marshal(pod)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err)
}
_, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}
type podPreemptorImpl struct {
Client clientset.Interface
}
@ -750,36 +726,33 @@ func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
}
func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
klog.V(3).Infof("Setting nominated node name for %s/%s to \"%s\"", pod.Namespace, pod.Name, nominatedNodeName)
if pod.Status.NominatedNodeName == nominatedNodeName {
func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
podCopy := pod.DeepCopy()
oldData, err := json.Marshal(podCopy)
podCopy.Status.NominatedNodeName = ""
return patchPod(p.Client, pod, podCopy)
}
func patchPod(client clientset.Interface, old *v1.Pod, new *v1.Pod) error {
oldData, err := json.Marshal(old)
if err != nil {
return err
}
podCopy.Status.NominatedNodeName = nominatedNodeName
newData, err := json.Marshal(podCopy)
newData, err := json.Marshal(new)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
if err != nil {
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err)
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err)
}
_, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
_, err = client.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}
func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
return p.setNominatedNodeName(pod, "")
}
func defaultAlgorithmSourceProviderName() *string {
provider := schedulerapi.SchedulerDefaultProviderName
return &provider

View File

@ -65,12 +65,6 @@ import (
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
type fakePodConditionUpdater struct{}
func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondition) error {
return nil
}
type fakePodPreemptor struct{}
func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
@ -81,10 +75,6 @@ func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error {
return nil
}
func (fp fakePodPreemptor) setNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
return nil
}
func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error {
return nil
}
@ -277,7 +267,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
expectAssumedPod: podWithID("foo", testNode.Name),
injectBindError: errB,
expectError: errors.New("plugin \"DefaultBinder\" failed to bind pod \"/foo\": binder"),
expectError: errors.New("Binding rejected: plugin \"DefaultBinder\" failed to bind pod \"/foo\": binder"),
expectErrorPod: podWithID("foo", testNode.Name),
expectForgetPod: podWithID("foo", testNode.Name),
eventReason: "FailedScheduling",
@ -334,9 +324,9 @@ func TestSchedulerScheduleOne(t *testing.T) {
}
s := &Scheduler{
SchedulerCache: sCache,
Algorithm: item.algo,
podConditionUpdater: fakePodConditionUpdater{},
SchedulerCache: sCache,
Algorithm: item.algo,
client: client,
Error: func(p *framework.QueuedPodInfo, err error) {
gotPod = p.Pod
gotError = err
@ -828,9 +818,9 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Error: func(p *framework.QueuedPodInfo, err error) {
errChan <- err
},
Profiles: profiles,
podConditionUpdater: fakePodConditionUpdater{},
podPreemptor: fakePodPreemptor{},
Profiles: profiles,
client: client,
podPreemptor: fakePodPreemptor{},
}
return sched, bindingChan, errChan
@ -1316,7 +1306,7 @@ func TestInjectingPluginConfigForVolumeBinding(t *testing.T) {
}
}
func TestSetNominatedNodeName(t *testing.T) {
func TestRemoveNominatedNodeName(t *testing.T) {
tests := []struct {
name string
currentNominatedNodeName string
@ -1324,30 +1314,15 @@ func TestSetNominatedNodeName(t *testing.T) {
expectedPatchRequests int
expectedPatchData string
}{
{
name: "Should make patch request to set node name",
currentNominatedNodeName: "",
newNominatedNodeName: "node1",
expectedPatchRequests: 1,
expectedPatchData: `{"status":{"nominatedNodeName":"node1"}}`,
},
{
name: "Should make patch request to clear node name",
currentNominatedNodeName: "node1",
newNominatedNodeName: "",
expectedPatchRequests: 1,
expectedPatchData: `{"status":{"nominatedNodeName":null}}`,
},
{
name: "Should not make patch request if nominated node is already set to the specified value",
currentNominatedNodeName: "node1",
newNominatedNodeName: "node1",
expectedPatchRequests: 0,
},
{
name: "Should not make patch request if nominated node is already cleared",
currentNominatedNodeName: "",
newNominatedNodeName: "",
expectedPatchRequests: 0,
},
}
@ -1371,8 +1346,8 @@ func TestSetNominatedNodeName(t *testing.T) {
}
preemptor := &podPreemptorImpl{Client: cs}
if err := preemptor.setNominatedNodeName(pod, test.newNominatedNodeName); err != nil {
t.Fatalf("Error calling setNominatedNodeName: %v", err)
if err := preemptor.removeNominatedNodeName(pod); err != nil {
t.Fatalf("Error calling removeNominatedNodeName: %v", err)
}
if actualPatchRequests != test.expectedPatchRequests {
@ -1386,11 +1361,13 @@ func TestSetNominatedNodeName(t *testing.T) {
}
}
func TestUpdatePodCondition(t *testing.T) {
func TestUpdatePod(t *testing.T) {
tests := []struct {
name string
currentPodConditions []v1.PodCondition
newPodCondition *v1.PodCondition
currentNominatedNodeName string
newNominatedNodeName string
expectedPatchRequests int
expectedPatchDataPattern string
}{
@ -1478,7 +1455,7 @@ func TestUpdatePodCondition(t *testing.T) {
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","message":"newMessage","reason":"newReason","type":"currentType"}]}}`,
},
{
name: "Should not make patch request if pod condition already exists and is identical",
name: "Should not make patch request if pod condition already exists and is identical and nominated node name is not set",
currentPodConditions: []v1.PodCondition{
{
Type: "currentType",
@ -1497,7 +1474,32 @@ func TestUpdatePodCondition(t *testing.T) {
Reason: "currentReason",
Message: "currentMessage",
},
expectedPatchRequests: 0,
currentNominatedNodeName: "node1",
expectedPatchRequests: 0,
},
{
name: "Should make patch request if pod condition already exists and is identical but nominated node name is set and different",
currentPodConditions: []v1.PodCondition{
{
Type: "currentType",
Status: "currentStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
Reason: "currentReason",
Message: "currentMessage",
},
},
newPodCondition: &v1.PodCondition{
Type: "currentType",
Status: "currentStatus",
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
Reason: "currentReason",
Message: "currentMessage",
},
newNominatedNodeName: "node1",
expectedPatchRequests: 1,
expectedPatchDataPattern: `{"status":{"nominatedNodeName":"node1"}}`,
},
}
for _, test := range tests {
@ -1516,16 +1518,18 @@ func TestUpdatePodCondition(t *testing.T) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Status: v1.PodStatus{Conditions: test.currentPodConditions},
Status: v1.PodStatus{
Conditions: test.currentPodConditions,
NominatedNodeName: test.currentNominatedNodeName,
},
}
updater := &podConditionUpdaterImpl{Client: cs}
if err := updater.update(pod, test.newPodCondition); err != nil {
if err := updatePod(cs, pod, test.newPodCondition, test.newNominatedNodeName); err != nil {
t.Fatalf("Error calling update: %v", err)
}
if actualPatchRequests != test.expectedPatchRequests {
t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests)
t.Fatalf("Actual patch requests (%d) does not equal expected patch requests (%d), actual patch data: %v", actualPatchRequests, test.expectedPatchRequests, actualPatchData)
}
regex, err := regexp.Compile(test.expectedPatchDataPattern)