diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 2830ef6b9d5..ecd9e3a125a 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -37,6 +37,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b78d0397c8b..a0a217f25e5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -18,6 +18,7 @@ package scheduler import ( "context" + "encoding/json" "fmt" "io/ioutil" "math/rand" @@ -27,6 +28,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" @@ -718,11 +721,23 @@ type podConditionUpdaterImpl struct { func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error { klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason) - if podutil.UpdatePodCondition(&pod.Status, condition) { - _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}) + oldData, err := json.Marshal(pod) + if err != nil { return err } - return nil + if !podutil.UpdatePodCondition(&pod.Status, condition) { + return nil + } + newData, err := json.Marshal(pod) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) + if err != nil { + return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err) + } + _, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + return err } type podPreemptorImpl struct { @@ -738,9 +753,25 @@ func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error { } func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { + klog.V(3).Infof("Setting nominated node name for %s/%s to \"%s\"", pod.Namespace, pod.Name, nominatedNodeName) + if pod.Status.NominatedNodeName == nominatedNodeName { + return nil + } podCopy := pod.DeepCopy() + oldData, err := json.Marshal(podCopy) + if err != nil { + return err + } podCopy.Status.NominatedNodeName = nominatedNodeName - _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), podCopy, metav1.UpdateOptions{}) + newData, err := json.Marshal(podCopy) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{}) + if err != nil { + return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err) + } + _, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") return err } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 6f9cfef0ff3..a518158b1f6 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -24,6 +24,7 @@ import ( "os" "path" "reflect" + "regexp" "sort" "strings" "sync" @@ -1316,3 +1317,227 @@ func TestInjectingPluginConfigForVolumeBinding(t *testing.T) { } } } + +func TestSetNominatedNodeName(t *testing.T) { + tests := []struct { + name string + currentNominatedNodeName string + newNominatedNodeName string + expectedPatchRequests int + expectedPatchData string + }{ + { + name: "Should make patch request to set node name", + currentNominatedNodeName: "", + newNominatedNodeName: "node1", + expectedPatchRequests: 1, + expectedPatchData: `{"status":{"nominatedNodeName":"node1"}}`, + }, + { + name: "Should make patch request to clear node name", + currentNominatedNodeName: "node1", + newNominatedNodeName: "", + expectedPatchRequests: 1, + expectedPatchData: `{"status":{"nominatedNodeName":null}}`, + }, + { + name: "Should not make patch request if nominated node is already set to the specified value", + currentNominatedNodeName: "node1", + newNominatedNodeName: "node1", + expectedPatchRequests: 0, + }, + { + name: "Should not make patch request if nominated node is already cleared", + currentNominatedNodeName: "", + newNominatedNodeName: "", + expectedPatchRequests: 0, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actualPatchRequests := 0 + var actualPatchData string + cs := &clientsetfake.Clientset{} + cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + actualPatchRequests++ + patch := action.(clienttesting.PatchAction) + actualPatchData = string(patch.GetPatch()) + // For this test, we don't care about the result of the patched pod, just that we got the expected + // patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response. + return true, &v1.Pod{}, nil + }) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Status: v1.PodStatus{NominatedNodeName: test.currentNominatedNodeName}, + } + + preemptor := &podPreemptorImpl{Client: cs} + if err := preemptor.setNominatedNodeName(pod, test.newNominatedNodeName); err != nil { + t.Fatalf("Error calling setNominatedNodeName: %v", err) + } + + if actualPatchRequests != test.expectedPatchRequests { + t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests) + } + + if test.expectedPatchRequests > 0 && actualPatchData != test.expectedPatchData { + t.Fatalf("Patch data mismatch: Actual was %v, but expected %v", actualPatchData, test.expectedPatchData) + } + }) + } +} + +func TestUpdatePodCondition(t *testing.T) { + tests := []struct { + name string + currentPodConditions []v1.PodCondition + newPodCondition *v1.PodCondition + expectedPatchRequests int + expectedPatchDataPattern string + }{ + { + name: "Should make patch request to add pod condition when there are none currently", + currentPodConditions: []v1.PodCondition{}, + newPodCondition: &v1.PodCondition{ + Type: "newType", + Status: "newStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)), + Reason: "newReason", + Message: "newMessage", + }, + expectedPatchRequests: 1, + expectedPatchDataPattern: `{"status":{"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`, + }, + { + name: "Should make patch request to add a new pod condition when there is already one with another type", + currentPodConditions: []v1.PodCondition{ + { + Type: "someOtherType", + Status: "someOtherTypeStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 11, 0, 0, 0, 0, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 10, 0, 0, 0, 0, time.UTC)), + Reason: "someOtherTypeReason", + Message: "someOtherTypeMessage", + }, + }, + newPodCondition: &v1.PodCondition{ + Type: "newType", + Status: "newStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)), + Reason: "newReason", + Message: "newMessage", + }, + expectedPatchRequests: 1, + expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"someOtherType"},{"type":"newType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`, + }, + { + name: "Should make patch request to update an existing pod condition", + currentPodConditions: []v1.PodCondition{ + { + Type: "currentType", + Status: "currentStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)), + Reason: "currentReason", + Message: "currentMessage", + }, + }, + newPodCondition: &v1.PodCondition{ + Type: "currentType", + Status: "newStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)), + Reason: "newReason", + Message: "newMessage", + }, + expectedPatchRequests: 1, + expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"currentType"}]}}`, + }, + { + name: "Should make patch request to update an existing pod condition, but the transition time should remain unchanged because the status is the same", + currentPodConditions: []v1.PodCondition{ + { + Type: "currentType", + Status: "currentStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)), + Reason: "currentReason", + Message: "currentMessage", + }, + }, + newPodCondition: &v1.PodCondition{ + Type: "currentType", + Status: "currentStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)), + Reason: "newReason", + Message: "newMessage", + }, + expectedPatchRequests: 1, + expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","message":"newMessage","reason":"newReason","type":"currentType"}]}}`, + }, + { + name: "Should not make patch request if pod condition already exists and is identical", + currentPodConditions: []v1.PodCondition{ + { + Type: "currentType", + Status: "currentStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)), + Reason: "currentReason", + Message: "currentMessage", + }, + }, + newPodCondition: &v1.PodCondition{ + Type: "currentType", + Status: "currentStatus", + LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)), + LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)), + Reason: "currentReason", + Message: "currentMessage", + }, + expectedPatchRequests: 0, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actualPatchRequests := 0 + var actualPatchData string + cs := &clientsetfake.Clientset{} + cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + actualPatchRequests++ + patch := action.(clienttesting.PatchAction) + actualPatchData = string(patch.GetPatch()) + // For this test, we don't care about the result of the patched pod, just that we got the expected + // patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response. + return true, &v1.Pod{}, nil + }) + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Status: v1.PodStatus{Conditions: test.currentPodConditions}, + } + + updater := &podConditionUpdaterImpl{Client: cs} + if err := updater.update(pod, test.newPodCondition); err != nil { + t.Fatalf("Error calling update: %v", err) + } + + if actualPatchRequests != test.expectedPatchRequests { + t.Fatalf("Actual patch requests (%d) dos not equal expected patch requests (%d)", actualPatchRequests, test.expectedPatchRequests) + } + + regex, err := regexp.Compile(test.expectedPatchDataPattern) + if err != nil { + t.Fatalf("Error compiling regexp for %v: %v", test.expectedPatchDataPattern, err) + } + + if test.expectedPatchRequests > 0 && !regex.MatchString(actualPatchData) { + t.Fatalf("Patch data mismatch: Actual was %v, but expected to match regexp %v", actualPatchData, test.expectedPatchDataPattern) + } + }) + } +} diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 47cf213b73a..cf8d37d2d76 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -45,7 +45,7 @@ import ( framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/plugin/pkg/admission/priority" testutils "k8s.io/kubernetes/test/integration/util" - utils "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils" ) var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300) @@ -629,8 +629,6 @@ func mkPriorityPodWithGrace(tc *testutils.TestContext, name string, priority int Labels: map[string]string{"pod": name}, Resources: defaultPodRes, }) - // Setting grace period to zero. Otherwise, we may never see the actual deletion - // of the pods in integration tests. pod.Spec.TerminationGracePeriodSeconds = &grace return pod } @@ -924,7 +922,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { } // Step 5. Check that nominated node name of the high priority pod is set. if err := waitForNominatedNodeName(cs, highPriPod); err != nil { - t.Errorf("NominatedNodeName annotation was not set for pod %v/%v: %v", medPriPod.Namespace, medPriPod.Name, err) + t.Errorf("NominatedNodeName annotation was not set for pod %v/%v: %v", highPriPod.Namespace, highPriPod.Name, err) } // And the nominated node name of the medium priority pod is cleared. if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {