Fix an e2e flake on updating node status

This commit is contained in:
Wei Huang 2020-06-09 10:54:41 -07:00
parent 6ac3ca4b17
commit 43da0fff29
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
2 changed files with 39 additions and 21 deletions

View File

@ -31,8 +31,10 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -18,6 +18,7 @@ package scheduling
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync/atomic"
@ -30,7 +31,9 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
@ -40,7 +43,6 @@ import (
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
@ -77,8 +79,10 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
cs.SchedulingV1().PriorityClasses().Delete(context.TODO(), pair.name, *metav1.NewDeleteOptions(0))
}
for _, node := range nodeList.Items {
delete(node.Status.Capacity, testExtendedResource)
cs.CoreV1().Nodes().UpdateStatus(context.TODO(), &node, metav1.UpdateOptions{})
nodeCopy := node.DeepCopy()
delete(nodeCopy.Status.Capacity, testExtendedResource)
err := patchNode(cs, &node, nodeCopy)
framework.ExpectNoError(err)
}
})
@ -119,8 +123,9 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
// Now create victim pods on each of the node with lower priority
for i, node := range nodeList.Items {
// Update each node to advertise 3 available extended resources
node.Status.Capacity[testExtendedResource] = resource.MustParse("3")
node, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), &node, metav1.UpdateOptions{})
nodeCopy := node.DeepCopy()
nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("3")
err := patchNode(cs, &node, nodeCopy)
framework.ExpectNoError(err)
// Request 2 of the available resources for the victim pods
@ -204,8 +209,9 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
pods := make([]*v1.Pod, 0, len(nodeList.Items))
for i, node := range nodeList.Items {
// Update each node to advertise 3 available extended resources
node.Status.Capacity[testExtendedResource] = resource.MustParse("3")
node, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), &node, metav1.UpdateOptions{})
nodeCopy := node.DeepCopy()
nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("3")
err := patchNode(cs, &node, nodeCopy)
framework.ExpectNoError(err)
// Request 2 of the available resources for the victim pods
@ -241,7 +247,7 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
framework.Logf("Created pod: %v", pods[i].Name)
}
if len(pods) < 2 {
e2eskipper.Skipf("We need at least two pods to be created but" +
framework.Failf("We need at least two pods to be created but" +
"all nodes are already heavily utilized, so preemption tests cannot be run")
}
ginkgo.By("Wait for pods to be scheduled.")
@ -305,12 +311,10 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
framework.ExpectNoError(err)
// update Node API object with a fake resource
nodeCopy := node.DeepCopy()
// force it to update
nodeCopy.ResourceVersion = "0"
ginkgo.By(fmt.Sprintf("Apply 10 fake resource to node %v.", node.Name))
nodeCopy := node.DeepCopy()
nodeCopy.Status.Capacity[fakeRes] = resource.MustParse("10")
node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
err = patchNode(cs, node, nodeCopy)
framework.ExpectNoError(err)
nodes = append(nodes, node)
}
@ -321,10 +325,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
}
for _, node := range nodes {
nodeCopy := node.DeepCopy()
// force it to update
nodeCopy.ResourceVersion = "0"
delete(nodeCopy.Status.Capacity, fakeRes)
_, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
err := patchNode(cs, node, nodeCopy)
framework.ExpectNoError(err)
}
})
@ -470,10 +472,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
if node != nil {
nodeCopy := node.DeepCopy()
// force it to update
nodeCopy.ResourceVersion = "0"
delete(nodeCopy.Status.Capacity, fakecpu)
_, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
err := patchNode(cs, node, nodeCopy)
framework.ExpectNoError(err)
}
for _, pair := range priorityPairs {
@ -504,10 +504,8 @@ var _ = SIGDescribe("SchedulerPreemption [Serial]", func() {
// update Node API object with a fake resource
nodeCopy := node.DeepCopy()
// force it to update
nodeCopy.ResourceVersion = "0"
nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
node, err = cs.CoreV1().Nodes().UpdateStatus(context.TODO(), nodeCopy, metav1.UpdateOptions{})
err = patchNode(cs, node, nodeCopy)
framework.ExpectNoError(err)
// create four PriorityClass: p1, p2, p3, p4
@ -737,3 +735,21 @@ func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout t
})
framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
}
func patchNode(client clientset.Interface, old *v1.Node, new *v1.Node) error {
oldData, err := json.Marshal(old)
if err != nil {
return err
}
newData, err := json.Marshal(new)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
if err != nil {
return fmt.Errorf("failed to create merge patch for node %q: %v", old.Name, err)
}
_, err = client.CoreV1().Nodes().Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}