mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Add unit tests for parallel StatefulSet create & delete
This commit is contained in:
parent
63e5b6bdb2
commit
d616cf72a3
@ -2362,6 +2362,12 @@ type requestTracker struct {
|
||||
requests int
|
||||
err error
|
||||
after int
|
||||
|
||||
parallelLock sync.Mutex
|
||||
parallel int
|
||||
maxParallel int
|
||||
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func (rt *requestTracker) errorReady() bool {
|
||||
@ -2371,16 +2377,31 @@ func (rt *requestTracker) errorReady() bool {
|
||||
}
|
||||
|
||||
func (rt *requestTracker) inc() {
|
||||
rt.parallelLock.Lock()
|
||||
rt.parallel++
|
||||
if rt.maxParallel < rt.parallel {
|
||||
rt.maxParallel = rt.parallel
|
||||
}
|
||||
rt.parallelLock.Unlock()
|
||||
|
||||
rt.Lock()
|
||||
defer rt.Unlock()
|
||||
rt.requests++
|
||||
if rt.delay != 0 {
|
||||
time.Sleep(rt.delay)
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *requestTracker) reset() {
|
||||
rt.parallelLock.Lock()
|
||||
rt.parallel = 0
|
||||
rt.parallelLock.Unlock()
|
||||
|
||||
rt.Lock()
|
||||
defer rt.Unlock()
|
||||
rt.err = nil
|
||||
rt.after = 0
|
||||
rt.delay = 0
|
||||
}
|
||||
|
||||
func newRequestTracker(requests int, err error, after int) requestTracker {
|
||||
@ -2851,6 +2872,182 @@ func fakeResourceVersion(object interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParallelScale(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
desc string
|
||||
replicas int32
|
||||
desiredReplicas int32
|
||||
}{
|
||||
{
|
||||
desc: "scale up from 3 to 30",
|
||||
replicas: 3,
|
||||
desiredReplicas: 30,
|
||||
},
|
||||
{
|
||||
desc: "scale down from 10 to 1",
|
||||
replicas: 10,
|
||||
desiredReplicas: 1,
|
||||
},
|
||||
|
||||
{
|
||||
desc: "scale down to 0",
|
||||
replicas: 501,
|
||||
desiredReplicas: 0,
|
||||
},
|
||||
{
|
||||
desc: "scale up from 0",
|
||||
replicas: 0,
|
||||
desiredReplicas: 1000,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
set := burst(newStatefulSet(0))
|
||||
parallelScale(t, set, tc.replicas, tc.desiredReplicas, assertBurstInvariants)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplicas int32, invariants invariantFunc) {
|
||||
var err error
|
||||
diff := desiredReplicas - replicas
|
||||
client := fake.NewSimpleClientset(set)
|
||||
om, _, ssc := setupController(client)
|
||||
om.createPodTracker.delay = time.Millisecond
|
||||
|
||||
*set.Spec.Replicas = replicas
|
||||
if err := parallelScaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
|
||||
t.Errorf("Failed to turn up StatefulSet : %s", err)
|
||||
}
|
||||
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||
}
|
||||
if set.Status.Replicas != replicas {
|
||||
t.Errorf("want %v, got %v replicas", replicas, set.Status.Replicas)
|
||||
}
|
||||
|
||||
fn := parallelScaleUpStatefulSetControl
|
||||
if diff < 0 {
|
||||
fn = parallelScaleDownStatefulSetControl
|
||||
}
|
||||
*set.Spec.Replicas = desiredReplicas
|
||||
if err := fn(set, ssc, om, invariants); err != nil {
|
||||
t.Errorf("Failed to scale StatefulSet : %s", err)
|
||||
}
|
||||
|
||||
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||
}
|
||||
|
||||
if set.Status.Replicas != desiredReplicas {
|
||||
t.Errorf("Failed to scale statefulset to %v replicas, got %v replicas", desiredReplicas, set.Status.Replicas)
|
||||
}
|
||||
|
||||
if (diff < -1 || diff > 1) && om.createPodTracker.maxParallel <= 1 {
|
||||
t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallel)
|
||||
}
|
||||
}
|
||||
|
||||
func parallelScaleUpStatefulSetControl(set *apps.StatefulSet,
|
||||
ssc StatefulSetControlInterface,
|
||||
om *fakeObjectManager,
|
||||
invariants invariantFunc) error {
|
||||
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Give up after 2 loops.
|
||||
// 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases.
|
||||
// Anything slower than that (requiring more iterations) indicates a problem and should fail the test.
|
||||
maxLoops := 2
|
||||
loops := maxLoops
|
||||
for set.Status.Replicas < *set.Spec.Replicas {
|
||||
if loops < 1 {
|
||||
return fmt.Errorf("after %v loops: want %v, got replicas %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas)
|
||||
}
|
||||
loops--
|
||||
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sort.Sort(ascendingOrdinal(pods))
|
||||
|
||||
ordinals := []int{}
|
||||
for _, pod := range pods {
|
||||
if pod.Status.Phase == "" {
|
||||
ordinals = append(ordinals, getOrdinal(pod))
|
||||
}
|
||||
}
|
||||
// ensure all pods are valid (have a phase)
|
||||
for _, ord := range ordinals {
|
||||
if pods, err = om.setPodPending(set, ord); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// run the controller once and check invariants
|
||||
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := invariants(set, om); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return invariants(set, om)
|
||||
}
|
||||
|
||||
func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, invariants invariantFunc) error {
|
||||
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Give up after 2 loops.
|
||||
// 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases.
|
||||
// Anything slower than that (requiring more iterations) indicates a problem and should fail the test.
|
||||
maxLoops := 2
|
||||
loops := maxLoops
|
||||
for set.Status.Replicas > *set.Spec.Replicas {
|
||||
if loops < 1 {
|
||||
return fmt.Errorf("after %v loops: want %v replicas, got %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas)
|
||||
}
|
||||
loops--
|
||||
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sort.Sort(ascendingOrdinal(pods))
|
||||
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||
return err
|
||||
}
|
||||
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := invariants(set, om); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func scaleUpStatefulSetControl(set *apps.StatefulSet,
|
||||
ssc StatefulSetControlInterface,
|
||||
om *fakeObjectManager,
|
||||
|
Loading…
Reference in New Issue
Block a user