From d616cf72a3b058d49594c7af4846e71eaf236a1d Mon Sep 17 00:00:00 2001 From: Aleksandra Malinowska Date: Tue, 27 Jun 2023 13:35:42 +0200 Subject: [PATCH] Add unit tests for parallel StatefulSet create & delete --- .../statefulset/stateful_set_control_test.go | 197 ++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index bebaa4469d5..c7fe131996c 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -2362,6 +2362,12 @@ type requestTracker struct { requests int err error after int + + parallelLock sync.Mutex + parallel int + maxParallel int + + delay time.Duration } func (rt *requestTracker) errorReady() bool { @@ -2371,16 +2377,31 @@ func (rt *requestTracker) errorReady() bool { } func (rt *requestTracker) inc() { + rt.parallelLock.Lock() + rt.parallel++ + if rt.maxParallel < rt.parallel { + rt.maxParallel = rt.parallel + } + rt.parallelLock.Unlock() + rt.Lock() defer rt.Unlock() rt.requests++ + if rt.delay != 0 { + time.Sleep(rt.delay) + } } func (rt *requestTracker) reset() { + rt.parallelLock.Lock() + rt.parallel = 0 + rt.parallelLock.Unlock() + rt.Lock() defer rt.Unlock() rt.err = nil rt.after = 0 + rt.delay = 0 } func newRequestTracker(requests int, err error, after int) requestTracker { @@ -2851,6 +2872,182 @@ func fakeResourceVersion(object interface{}) { } } +func TestParallelScale(t *testing.T) { + for _, tc := range []struct { + desc string + replicas int32 + desiredReplicas int32 + }{ + { + desc: "scale up from 3 to 30", + replicas: 3, + desiredReplicas: 30, + }, + { + desc: "scale down from 10 to 1", + replicas: 10, + desiredReplicas: 1, + }, + + { + desc: "scale down to 0", + replicas: 501, + desiredReplicas: 0, + }, + { + desc: "scale up from 0", + replicas: 0, + desiredReplicas: 1000, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + set := burst(newStatefulSet(0)) + parallelScale(t, set, tc.replicas, tc.desiredReplicas, assertBurstInvariants) + }) + } + +} + +func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplicas int32, invariants invariantFunc) { + var err error + diff := desiredReplicas - replicas + client := fake.NewSimpleClientset(set) + om, _, ssc := setupController(client) + om.createPodTracker.delay = time.Millisecond + + *set.Spec.Replicas = replicas + if err := parallelScaleUpStatefulSetControl(set, ssc, om, invariants); err != nil { + t.Errorf("Failed to turn up StatefulSet : %s", err) + } + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + if set.Status.Replicas != replicas { + t.Errorf("want %v, got %v replicas", replicas, set.Status.Replicas) + } + + fn := parallelScaleUpStatefulSetControl + if diff < 0 { + fn = parallelScaleDownStatefulSetControl + } + *set.Spec.Replicas = desiredReplicas + if err := fn(set, ssc, om, invariants); err != nil { + t.Errorf("Failed to scale StatefulSet : %s", err) + } + + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + + if set.Status.Replicas != desiredReplicas { + t.Errorf("Failed to scale statefulset to %v replicas, got %v replicas", desiredReplicas, set.Status.Replicas) + } + + if (diff < -1 || diff > 1) && om.createPodTracker.maxParallel <= 1 { + t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallel) + } +} + +func parallelScaleUpStatefulSetControl(set *apps.StatefulSet, + ssc StatefulSetControlInterface, + om *fakeObjectManager, + invariants invariantFunc) error { + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + return err + } + + // Give up after 2 loops. + // 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases. + // Anything slower than that (requiring more iterations) indicates a problem and should fail the test. + maxLoops := 2 + loops := maxLoops + for set.Status.Replicas < *set.Spec.Replicas { + if loops < 1 { + return fmt.Errorf("after %v loops: want %v, got replicas %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas) + } + loops-- + pods, err := om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + return err + } + sort.Sort(ascendingOrdinal(pods)) + + ordinals := []int{} + for _, pod := range pods { + if pod.Status.Phase == "" { + ordinals = append(ordinals, getOrdinal(pod)) + } + } + // ensure all pods are valid (have a phase) + for _, ord := range ordinals { + if pods, err = om.setPodPending(set, ord); err != nil { + return err + } + } + + // run the controller once and check invariants + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) + if err != nil { + return err + } + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } + if err := invariants(set, om); err != nil { + return err + } + } + return invariants(set, om) +} + +func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, invariants invariantFunc) error { + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + return err + } + + // Give up after 2 loops. + // 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases. + // Anything slower than that (requiring more iterations) indicates a problem and should fail the test. + maxLoops := 2 + loops := maxLoops + for set.Status.Replicas > *set.Spec.Replicas { + if loops < 1 { + return fmt.Errorf("after %v loops: want %v replicas, got %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas) + } + loops-- + pods, err := om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + return err + } + sort.Sort(ascendingOrdinal(pods)) + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + return err + } + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + return err + } + } + + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } + if err := invariants(set, om); err != nil { + return err + } + + return nil +} + func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager,