diff --git a/test/e2e/scheduling/BUILD b/test/e2e/scheduling/BUILD index 4c56b65b043..1269cb883dd 100644 --- a/test/e2e/scheduling/BUILD +++ b/test/e2e/scheduling/BUILD @@ -31,8 +31,10 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/test/e2e/scheduling/preemption.go b/test/e2e/scheduling/preemption.go index fd853729d52..aa3d463f9f4 100644 --- a/test/e2e/scheduling/preemption.go +++ b/test/e2e/scheduling/preemption.go @@ -18,6 +18,7 @@ package scheduling import ( "context" + "encoding/json" "fmt" "strings" "sync/atomic" @@ -30,7 +31,9 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" @@ -40,7 +43,6 @@ import ( e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset" - e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "github.com/onsi/ginkgo" "github.com/onsi/gomega" @@ -77,8 +79,10 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, *metav1.NewDeleteOptions(0)) } for _, node := range nodeList.Items { - delete(node.Status.Capacity, testExtendedResource) - cs.CoreV1().Nodes().UpdateStatus(context.TODO(), &node, metav1.UpdateOptions{}) + nodeCopy := node.DeepCopy() + delete(nodeCopy.Status.Capacity, testExtendedResource) + err := patchNode(cs, &node, nodeCopy) + framework.ExpectNoError(err) } }) @@ -119,8 +123,9 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { // Now create victim pods on each of the node with lower priority for i, node := range nodeList.Items { // Update each node to advertise 3 available extended resources - node.Status.Capacity[testExtendedResource] = resource.MustParse("3") - node, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), &node, metav1.UpdateOptions{}) + nodeCopy := node.DeepCopy() + nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("3") + err := patchNode(cs, &node, nodeCopy) framework.ExpectNoError(err) // Request 2 of the available resources for the victim pods @@ -204,8 +209,9 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { pods := make([]*v1.Pod, 0, len(nodeList.Items)) for i, node := range nodeList.Items { // Update each node to advertise 3 available extended resources - node.Status.Capacity[testExtendedResource] = resource.MustParse("3") - node, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), &node, metav1.UpdateOptions{}) + nodeCopy := node.DeepCopy() + nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("3") + err := patchNode(cs, &node, nodeCopy) framework.ExpectNoError(err) // Request 2 of the available resources for the victim pods @@ -241,7 +247,7 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { framework.Logf("Created pod: %v", pods[i].Name) } if len(pods) < 2 { - e2eskipper.Skipf("We need at least two pods to be created but" + + framework.Failf("We need at least two pods to be created but" + "all nodes are already heavily utilized, so preemption tests cannot be run") } ginkgo.By("Wait for pods to be scheduled.") @@ -305,12 +311,10 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) framework.ExpectNoError(err) // update Node API object with a fake resource - nodeCopy := node.DeepCopy() - // force it to update - nodeCopy.ResourceVersion = "0" ginkgo.By(fmt.Sprintf("Apply 10 fake resource to node %v.", node.Name)) + nodeCopy := node.DeepCopy() nodeCopy.Status.Capacity[fakeRes] = resource.MustParse("10") - node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{}) + err = patchNode(cs, node, nodeCopy) framework.ExpectNoError(err) nodes = append(nodes, node) } @@ -321,10 +325,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { } for _, node := range nodes { nodeCopy := node.DeepCopy() - // force it to update - nodeCopy.ResourceVersion = "0" delete(nodeCopy.Status.Capacity, fakeRes) - _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{}) + err := patchNode(cs, node, nodeCopy) framework.ExpectNoError(err) } }) @@ -470,10 +472,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { if node != nil { nodeCopy := node.DeepCopy() - // force it to update - nodeCopy.ResourceVersion = "0" delete(nodeCopy.Status.Capacity, fakecpu) - _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{}) + err := patchNode(cs, node, nodeCopy) framework.ExpectNoError(err) } for _, pair := range priorityPairs { @@ -504,10 +504,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() { // update Node API object with a fake resource nodeCopy := node.DeepCopy() - // force it to update - nodeCopy.ResourceVersion = "0" nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000") - node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{}) + err = patchNode(cs, node, nodeCopy) framework.ExpectNoError(err) // create four PriorityClass: p1, p2, p3, p4 @@ -737,3 +735,21 @@ func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout t }) framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name) } + +func patchNode(client clientset.Interface, old *v1.Node, new *v1.Node) error { + oldData, err := json.Marshal(old) + if err != nil { + return err + } + + newData, err := json.Marshal(new) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) + if err != nil { + return fmt.Errorf("failed to create merge patch for node %q: %v", old.Name, err) + } + _, err = client.CoreV1().Nodes().Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + return err +}