mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 02:11:09 +00:00
Merge pull request #86400 from Huang-Wei/fix-flaky-preemption-e2e
Fix a flaky scheduler preemption e2e
This commit is contained in:
commit
2cbd474597
@ -18,6 +18,7 @@ package replicaset
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -43,8 +44,14 @@ func WaitForReadyReplicaSet(c clientset.Interface, ns, name string) error {
|
||||
|
||||
// WaitForReplicaSetTargetAvailableReplicas waits for .status.availableReplicas of a RS to equal targetReplicaNum
|
||||
func WaitForReplicaSetTargetAvailableReplicas(c clientset.Interface, replicaSet *appsv1.ReplicaSet, targetReplicaNum int32) error {
|
||||
return WaitForReplicaSetTargetAvailableReplicasWithTimeout(c, replicaSet, targetReplicaNum, framework.PollShortTimeout)
|
||||
}
|
||||
|
||||
// WaitForReplicaSetTargetAvailableReplicasWithTimeout waits for .status.availableReplicas of a RS to equal targetReplicaNum
|
||||
// with given timeout.
|
||||
func WaitForReplicaSetTargetAvailableReplicasWithTimeout(c clientset.Interface, replicaSet *appsv1.ReplicaSet, targetReplicaNum int32, timeout time.Duration) error {
|
||||
desiredGeneration := replicaSet.Generation
|
||||
err := wait.PollImmediate(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
|
||||
err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
|
||||
rs, err := c.AppsV1().ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -19,8 +19,10 @@ package scheduling
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
@ -289,7 +291,7 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
|
||||
nodeCopy := node.DeepCopy()
|
||||
// force it to update
|
||||
nodeCopy.ResourceVersion = "0"
|
||||
nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("800")
|
||||
nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
|
||||
node, err = cs.CoreV1().Nodes().UpdateStatus(nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
@ -307,8 +309,8 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("runs ReplicaSets to verify preemption running path [Flaky]", func() {
|
||||
podNamesSeen := make(map[string]struct{})
|
||||
ginkgo.It("runs ReplicaSets to verify preemption running path", func() {
|
||||
podNamesSeen := []int32{0, 0, 0}
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
// create a pod controller to list/watch pod events from the test framework namespace
|
||||
@ -327,7 +329,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
if pod, ok := obj.(*v1.Pod); ok {
|
||||
podNamesSeen[pod.Name] = struct{}{}
|
||||
if strings.HasPrefix(pod.Name, "rs-pod1") {
|
||||
atomic.AddInt32(&podNamesSeen[0], 1)
|
||||
} else if strings.HasPrefix(pod.Name, "rs-pod2") {
|
||||
atomic.AddInt32(&podNamesSeen[1], 1)
|
||||
} else if strings.HasPrefix(pod.Name, "rs-pod3") {
|
||||
atomic.AddInt32(&podNamesSeen[2], 1)
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
@ -335,10 +343,10 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
|
||||
go podController.Run(stopCh)
|
||||
defer close(stopCh)
|
||||
|
||||
// prepare four ReplicaSet
|
||||
// prepare three ReplicaSet
|
||||
rsConfs := []pauseRSConfig{
|
||||
{
|
||||
Replicas: int32(5),
|
||||
Replicas: int32(1),
|
||||
PodConfig: pausePodConfig{
|
||||
Name: "pod1",
|
||||
Namespace: ns,
|
||||
@ -346,13 +354,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
|
||||
PriorityClassName: "p1",
|
||||
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
|
||||
Resources: &v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{fakecpu: resource.MustParse("40")},
|
||||
Limits: v1.ResourceList{fakecpu: resource.MustParse("40")},
|
||||
Requests: v1.ResourceList{fakecpu: resource.MustParse("200")},
|
||||
Limits: v1.ResourceList{fakecpu: resource.MustParse("200")},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Replicas: int32(4),
|
||||
Replicas: int32(1),
|
||||
PodConfig: pausePodConfig{
|
||||
Name: "pod2",
|
||||
Namespace: ns,
|
||||
@ -360,13 +368,13 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
|
||||
PriorityClassName: "p2",
|
||||
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
|
||||
Resources: &v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{fakecpu: resource.MustParse("50")},
|
||||
Limits: v1.ResourceList{fakecpu: resource.MustParse("50")},
|
||||
Requests: v1.ResourceList{fakecpu: resource.MustParse("300")},
|
||||
Limits: v1.ResourceList{fakecpu: resource.MustParse("300")},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Replicas: int32(4),
|
||||
Replicas: int32(1),
|
||||
PodConfig: pausePodConfig{
|
||||
Name: "pod3",
|
||||
Namespace: ns,
|
||||
@ -374,61 +382,68 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
|
||||
PriorityClassName: "p3",
|
||||
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
|
||||
Resources: &v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{fakecpu: resource.MustParse("95")},
|
||||
Limits: v1.ResourceList{fakecpu: resource.MustParse("95")},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Replicas: int32(1),
|
||||
PodConfig: pausePodConfig{
|
||||
Name: "pod4",
|
||||
Namespace: ns,
|
||||
Labels: map[string]string{"name": "pod4"},
|
||||
PriorityClassName: "p4",
|
||||
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
|
||||
Resources: &v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{fakecpu: resource.MustParse("400")},
|
||||
Limits: v1.ResourceList{fakecpu: resource.MustParse("400")},
|
||||
Requests: v1.ResourceList{fakecpu: resource.MustParse("450")},
|
||||
Limits: v1.ResourceList{fakecpu: resource.MustParse("450")},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
// create ReplicaSet{1,2,3} so as to occupy 780/800 fake resource
|
||||
rsNum := len(rsConfs)
|
||||
for i := 0; i < rsNum-1; i++ {
|
||||
// create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource
|
||||
for i := range rsConfs {
|
||||
runPauseRS(f, rsConfs[i])
|
||||
}
|
||||
|
||||
framework.Logf("pods created so far: %v", podNamesSeen)
|
||||
framework.Logf("length of pods created so far: %v", len(podNamesSeen))
|
||||
|
||||
// create ReplicaSet4
|
||||
// if runPauseRS failed, it means ReplicaSet4 cannot be scheduled even after 1 minute
|
||||
// which is unacceptable
|
||||
runPauseRS(f, rsConfs[rsNum-1])
|
||||
// create a Preemptor Pod
|
||||
preemptorPodConf := pausePodConfig{
|
||||
Name: "pod4",
|
||||
Namespace: ns,
|
||||
Labels: map[string]string{"name": "pod4"},
|
||||
PriorityClassName: "p4",
|
||||
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
|
||||
Resources: &v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{fakecpu: resource.MustParse("500")},
|
||||
Limits: v1.ResourceList{fakecpu: resource.MustParse("500")},
|
||||
},
|
||||
}
|
||||
preemptorPod := createPod(f, preemptorPodConf)
|
||||
waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout)
|
||||
|
||||
framework.Logf("pods created so far: %v", podNamesSeen)
|
||||
framework.Logf("length of pods created so far: %v", len(podNamesSeen))
|
||||
|
||||
// count pods number of ReplicaSet{1,2,3}, if it's more than expected replicas
|
||||
// then it denotes its pods have been over-preempted
|
||||
// "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once
|
||||
maxRSPodsSeen := []int{5 * 2, 4 * 2, 4}
|
||||
rsPodsSeen := []int{0, 0, 0}
|
||||
for podName := range podNamesSeen {
|
||||
if strings.HasPrefix(podName, "rs-pod1") {
|
||||
rsPodsSeen[0]++
|
||||
} else if strings.HasPrefix(podName, "rs-pod2") {
|
||||
rsPodsSeen[1]++
|
||||
} else if strings.HasPrefix(podName, "rs-pod3") {
|
||||
rsPodsSeen[2]++
|
||||
// count pods number of ReplicaSet{1,2,3}:
|
||||
// - if it's more than expected replicas, it denotes its pods have been over-preempted
|
||||
// - if it's less than expected replicas, it denotes its pods are under-preempted
|
||||
// "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once.
|
||||
expectedRSPods := []int32{1 * 2, 1 * 2, 1}
|
||||
err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
|
||||
for i := 0; i < len(podNamesSeen); i++ {
|
||||
got := atomic.LoadInt32(&podNamesSeen[i])
|
||||
if got < expectedRSPods[i] {
|
||||
framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got)
|
||||
return false, nil
|
||||
} else if got > expectedRSPods[i] {
|
||||
return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
framework.Logf("pods created so far: %v", podNamesSeen)
|
||||
framework.Failf("failed pod observation expectations: %v", err)
|
||||
}
|
||||
for i, got := range rsPodsSeen {
|
||||
expected := maxRSPodsSeen[i]
|
||||
if got > expected {
|
||||
framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expected, got)
|
||||
|
||||
// If logic continues to here, we should do a final check to ensure within a time period,
|
||||
// the state is stable; otherwise, pods may be over-preempted.
|
||||
time.Sleep(5 * time.Second)
|
||||
for i := 0; i < len(podNamesSeen); i++ {
|
||||
got := atomic.LoadInt32(&podNamesSeen[i])
|
||||
if got < expectedRSPods[i] {
|
||||
framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
|
||||
} else if got > expectedRSPods[i] {
|
||||
framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
|
||||
}
|
||||
}
|
||||
})
|
||||
@ -472,6 +487,32 @@ func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSe
|
||||
|
||||
func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
|
||||
rs := createPauseRS(f, conf)
|
||||
framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicas(f.ClientSet, rs, conf.Replicas))
|
||||
framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
|
||||
return rs
|
||||
}
|
||||
|
||||
func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
|
||||
namespace := conf.Namespace
|
||||
if len(namespace) == 0 {
|
||||
namespace = f.Namespace.Name
|
||||
}
|
||||
pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(initPausePod(f, conf))
|
||||
framework.ExpectNoError(err)
|
||||
return pod
|
||||
}
|
||||
|
||||
// waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks
|
||||
// if the 'spec.NodeName' field of preemptor 'pod' has been set.
|
||||
func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) {
|
||||
err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
|
||||
pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(pod.Spec.NodeName) > 0 {
|
||||
return true, nil
|
||||
}
|
||||
return false, err
|
||||
})
|
||||
framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user