diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 4ec84d541c4..2f21701c979 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -19,8 +19,12 @@ package statefulset import ( "errors" "fmt" + "math/rand" + "reflect" + "runtime" "sort" "strconv" + "strings" "testing" "time" @@ -28,9 +32,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1" @@ -40,24 +46,75 @@ import ( "k8s.io/kubernetes/pkg/controller" ) -func TestDefaultStatefulSetControlCreatesPods(t *testing.T) { - set := newStatefulSet(3) - client := fake.NewSimpleClientset(set) +type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error +func setupController(client clientset.Interface) (*fakeStatefulPodControl, StatefulSetControlInterface, chan struct{}) { informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) ssc := NewDefaultStatefulSetControl(spc) stop := make(chan struct{}) - defer close(stop) informerFactory.Start(stop) cache.WaitForCacheSync( stop, informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, informerFactory.Core().V1().Pods().Informer().HasSynced, ) + return spc, ssc, stop +} - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { +func burst(set *apps.StatefulSet) *apps.StatefulSet { + set.Spec.PodManagementPolicy = apps.ParallelPodManagement + return set +} + +func TestStatefulSetControl(t *testing.T) { + simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) } + largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) } + + testCases := []struct { + fn func(*testing.T, *apps.StatefulSet, invariantFunc) + obj func() *apps.StatefulSet + }{ + {CreatesPods, simpleSetFn}, + {ScalesUp, simpleSetFn}, + {ScalesDown, simpleSetFn}, + {ReplacesPods, largeSetFn}, + {RecreatesFailedPod, simpleSetFn}, + {SetsInitAnnotation, simpleSetFn}, + {CreatePodFailure, simpleSetFn}, + {UpdatePodFailure, simpleSetFn}, + {UpdateSetStatusFailure, simpleSetFn}, + {PodRecreateDeleteFailure, simpleSetFn}, + } + + for _, testCase := range testCases { + fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name() + if i := strings.LastIndex(fnName, "."); i != -1 { + fnName = fnName[i+1:] + } + t.Run( + fmt.Sprintf("%s/Monotonic", fnName), + func(t *testing.T) { + testCase.fn(t, testCase.obj(), assertMonotonicInvariants) + }, + ) + t.Run( + fmt.Sprintf("%s/Burst", fnName), + func(t *testing.T) { + set := burst(testCase.obj()) + testCase.fn(t, set, assertBurstInvariants) + }, + ) + } +} + +func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { + client := fake.NewSimpleClientset(set) + spc, ssc, stop := setupController(client) + defer close(stop) + + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } var err error @@ -70,28 +127,16 @@ func TestDefaultStatefulSetControlCreatesPods(t *testing.T) { } } -func TestStatefulSetControlScaleUp(t *testing.T) { - set := newStatefulSet(3) +func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) - - stop := make(chan struct{}) + spc, ssc, stop := setupController(client) defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } *set.Spec.Replicas = 4 - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to scale StatefulSet : %s", err) } var err error @@ -104,28 +149,16 @@ func TestStatefulSetControlScaleUp(t *testing.T) { } } -func TestStatefulSetControlScaleDown(t *testing.T) { - set := newStatefulSet(3) +func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) - - stop := make(chan struct{}) + spc, ssc, stop := setupController(client) defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } *set.Spec.Replicas = 0 - if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to scale StatefulSet : %s", err) } if set.Status.Replicas != 0 { @@ -133,24 +166,12 @@ func TestStatefulSetControlScaleDown(t *testing.T) { } } -func TestStatefulSetControlReplacesPods(t *testing.T) { - set := newStatefulSet(5) +func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) - - stop := make(chan struct{}) + spc, ssc, stop := setupController(client) defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } var err error @@ -215,12 +236,11 @@ func TestStatefulSetControlReplacesPods(t *testing.T) { } } -func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { +func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) ssc := NewDefaultStatefulSetControl(spc) - set := newStatefulSet(3) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { t.Error(err) @@ -232,7 +252,7 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { if err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } - if err := assertInvariants(set, spc); err != nil { + if err := invariants(set, spc); err != nil { t.Error(err) } pods, err = spc.podsLister.Pods(set.Namespace).List(selector) @@ -244,7 +264,7 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { if err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } - if err := assertInvariants(set, spc); err != nil { + if err := invariants(set, spc); err != nil { t.Error(err) } pods, err = spc.podsLister.Pods(set.Namespace).List(selector) @@ -256,22 +276,10 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { } } -func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { - set := newStatefulSet(3) +func SetsInitAnnotation(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) - - stop := make(chan struct{}) + spc, ssc, stop := setupController(client) defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { @@ -284,7 +292,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { if err = ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } - if err = assertInvariants(set, spc); err != nil { + if err = invariants(set, spc); err != nil { t.Error(err) } if pods, err = spc.setPodRunning(set, 0); err != nil { @@ -300,7 +308,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { if err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } - if err := assertInvariants(set, spc); err != nil { + if err := invariants(set, spc); err != nil { t.Error(err) } if replicas != int(set.Status.Replicas) { @@ -309,7 +317,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { if pods, err = spc.setPodInitStatus(set, 0, true); err != nil { t.Error(err) } - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -321,28 +329,16 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { } } -func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) { - set := newStatefulSet(3) +func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) + spc, ssc, stop := setupController(client) + defer close(stop) spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - stop := make(chan struct{}) - defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - - if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } var err error @@ -355,26 +351,14 @@ func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) { } } -func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) { - set := newStatefulSet(3) +func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) + spc, ssc, stop := setupController(client) + defer close(stop) spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) - stop := make(chan struct{}) - defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - // have to have 1 successful loop first - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Fatalf("Unexpected error: %v", err) } var err error @@ -404,25 +388,13 @@ func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) { } } -func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) { - set := newStatefulSet(3) +func testDefaultStatefulSetControlBlocksOnTerminating(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) + spc, ssc, stop := setupController(client) + defer close(stop) spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) - stop := make(chan struct{}) - defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Fatalf("Unexpected error: %v", err) } var err error @@ -458,7 +430,7 @@ func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) { if len(pods) != 2 { t.Fatalf("Expected 3 pods, got %d", len(pods)) } - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Fatalf("Unexpected error: %v", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -470,28 +442,16 @@ func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) { } } -func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) { - set := newStatefulSet(3) +func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) + spc, ssc, stop := setupController(client) + defer close(stop) spc.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) - stop := make(chan struct{}) - defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - - if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) { t.Errorf("StatefulSetControl did not return InternalError found %s", err) } - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } var err error @@ -504,22 +464,10 @@ func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) { } } -func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) { - set := newStatefulSet(3) +func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) - - stop := make(chan struct{}) + spc, ssc, stop := setupController(client) defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { @@ -532,7 +480,7 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) { if err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } - if err := assertInvariants(set, spc); err != nil { + if err := invariants(set, spc); err != nil { t.Error(err) } pods, err = spc.podsLister.Pods(set.Namespace).List(selector) @@ -545,13 +493,13 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) { if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { t.Errorf("StatefulSet failed to %s", err) } - if err := assertInvariants(set, spc); err != nil { + if err := invariants(set, spc); err != nil { t.Error(err) } if err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Error updating StatefulSet %s", err) } - if err := assertInvariants(set, spc); err != nil { + if err := invariants(set, spc); err != nil { t.Error(err) } pods, err = spc.podsLister.Pods(set.Namespace).List(selector) @@ -564,23 +512,13 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) { } func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { + invariants := assertMonotonicInvariants set := newStatefulSet(3) client := fake.NewSimpleClientset(set) - - informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) - spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) - ssc := NewDefaultStatefulSetControl(spc) - - stop := make(chan struct{}) + spc, ssc, stop := setupController(client) defer close(stop) - informerFactory.Start(stop) - cache.WaitForCacheSync( - stop, - informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, - informerFactory.Core().V1().Pods().Informer().HasSynced, - ) - if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } var err error @@ -590,10 +528,10 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { } *set.Spec.Replicas = 0 spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleDownStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { + if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) { t.Errorf("StatefulSetControl failed to throw error on delete %s", err) } - if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil { + if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); err != nil { t.Errorf("Failed to turn down StatefulSet %s", err) } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -672,6 +610,14 @@ func (spc *fakeStatefulPodControl) SetUpdateStatefulSetStatusError(err error, af spc.updateStatusTracker.after = after } +func copyPod(pod *v1.Pod) *v1.Pod { + obj, err := api.Scheme.Copy(pod) + if err != nil { + panic(err) + } + return obj.(*v1.Pod) +} + func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { @@ -685,7 +631,7 @@ func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) } sort.Sort(ascendingOrdinal(pods)) - pod := pods[ordinal] + pod := copyPod(pods[ordinal]) pod.Status.Phase = v1.PodPending fakeResourceVersion(pod) spc.podsIndexer.Update(pod) @@ -705,7 +651,7 @@ func (spc *fakeStatefulPodControl) setPodRunning(set *apps.StatefulSet, ordinal return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) } sort.Sort(ascendingOrdinal(pods)) - pod := pods[ordinal] + pod := copyPod(pods[ordinal]) pod.Status.Phase = v1.PodRunning fakeResourceVersion(pod) spc.podsIndexer.Update(pod) @@ -725,7 +671,7 @@ func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal in return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) } sort.Sort(ascendingOrdinal(pods)) - pod := pods[ordinal] + pod := copyPod(pods[ordinal]) condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} podutil.UpdatePodCondition(&pod.Status, &condition) fakeResourceVersion(pod) @@ -746,7 +692,7 @@ func (spc *fakeStatefulPodControl) setPodInitStatus(set *apps.StatefulSet, ordin return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) } sort.Sort(ascendingOrdinal(pods)) - pod := pods[ordinal] + pod := copyPod(pods[ordinal]) if init { pod.Annotations[apps.StatefulSetInitAnnotation] = "true" } else { @@ -850,7 +796,7 @@ func (spc *fakeStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet var _ StatefulPodControlInterface = &fakeStatefulPodControl{} -func assertInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error { +func assertMonotonicInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error { selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { return err @@ -862,31 +808,61 @@ func assertInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error sort.Sort(ascendingOrdinal(pods)) for ord := 0; ord < len(pods); ord++ { if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) { - return fmt.Errorf("Predecessor %s is Running and Ready while %s is not", - pods[ord-1].Name, - pods[ord].Name) + return fmt.Errorf("Successor %s is Running and Ready while %s is not", pods[ord].Name, pods[ord-1].Name) } + if getOrdinal(pods[ord]) != ord { - return fmt.Errorf("Pods %s deployed in the wrong order", - pods[ord].Name) + return fmt.Errorf("pods %s deployed in the wrong order", pods[ord].Name) } + if !storageMatches(set, pods[ord]) { - return fmt.Errorf("Pods %s does not match the storage specification of StatefulSet %s ", - pods[ord]. - Name, set.Name) - } else { - for _, claim := range getPersistentVolumeClaims(set, pods[ord]) { - if claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name); err != nil { - return err - } else if claim == nil { - return fmt.Errorf("claim %s for Pod %s was not created", - claim.Name, - pods[ord].Name) - } + return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name) + } + + for _, claim := range getPersistentVolumeClaims(set, pods[ord]) { + claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name) + if err != nil { + return err + } + if claim == nil { + return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name) } } + if !identityMatches(set, pods[ord]) { - return fmt.Errorf("Pods %s does not match the identity specification of StatefulSet %s ", + return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name) + } + } + return nil +} + +func assertBurstInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error { + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + return err + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + return err + } + sort.Sort(ascendingOrdinal(pods)) + for ord := 0; ord < len(pods); ord++ { + if !storageMatches(set, pods[ord]) { + return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name) + } + + for _, claim := range getPersistentVolumeClaims(set, pods[ord]) { + claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name) + if err != nil { + return err + } + if claim == nil { + return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name) + } + } + + if !identityMatches(set, pods[ord]) { + return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name) } @@ -898,14 +874,15 @@ func fakeResourceVersion(object interface{}) { obj, isObj := object.(metav1.Object) if !isObj { return - } else if version := obj.GetResourceVersion(); version == "" { + } + if version := obj.GetResourceVersion(); version == "" { obj.SetResourceVersion("1") } else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil { obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10)) } } -func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl) error { +func scaleUpStatefulSetControl(t *testing.T, set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error { selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { return err @@ -915,47 +892,59 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInte if err != nil { return err } - if ord := len(pods) - 1; ord >= 0 { - ord := len(pods) - 1 - if pods, err = spc.setPodPending(set, ord); err != nil { - return err - } - if err = ssc.UpdateStatefulSet(set, pods); err != nil { - return err - } - set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) - if err != nil { - return err - } - if pods, err = spc.setPodRunning(set, ord); err != nil { - return err - } - if err = ssc.UpdateStatefulSet(set, pods); err != nil { - return err - } - set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) - if err != nil { - return err - } - if pods, err = spc.setPodReady(set, ord); err != nil { - return err + sort.Sort(ascendingOrdinal(pods)) + + // ensure all pods are valid (have a phase) + initialized := false + for ord, pod := range pods { + if pod.Status.Phase == "" { + t.Logf("found pod %s pending", pod.Name) + if pods, err = spc.setPodPending(set, ord); err != nil { + return err + } + break } } - if err := ssc.UpdateStatefulSet(set, pods); err != nil { + if initialized { + continue + } + + // select one of the pods and move it forward in status + if len(pods) > 0 { + ord := int(rand.Int63n(int64(len(pods)))) + pod := pods[ord] + switch pod.Status.Phase { + case v1.PodPending: + t.Logf("set pod %s running", pod.Name) + if pods, err = spc.setPodRunning(set, ord); err != nil { + return err + } + case v1.PodRunning: + t.Logf("set pod %s ready", pod.Name) + if pods, err = spc.setPodReady(set, ord); err != nil { + return err + } + default: + continue + } + } + + // run the controller once and check invariants + if err = ssc.UpdateStatefulSet(set, pods); err != nil { return err } set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) if err != nil { return err } - if err := assertInvariants(set, spc); err != nil { + if err := invariants(set, spc); err != nil { return err } } - return assertInvariants(set, spc) + return invariants(set, spc) } -func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl) error { +func scaleDownStatefulSetControl(t *testing.T, set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error { selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { return err @@ -965,6 +954,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn if err != nil { return err } + sort.Sort(ascendingOrdinal(pods)) if ordinal := len(pods) - 1; ordinal >= 0 { if err := ssc.UpdateStatefulSet(set, pods); err != nil { return err @@ -997,9 +987,9 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn if err != nil { return err } - if err := assertInvariants(set, spc); err != nil { + if err := invariants(set, spc); err != nil { return err } } - return assertInvariants(set, spc) + return invariants(set, spc) } diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 1f73b051268..c867cf4bbe3 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -640,7 +640,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl pod = getPodAtOrdinal(pods, ord) ssc.updatePod(&prev, pod) fakeWorker(ssc) - if err := assertInvariants(set, spc); err != nil { + if err := assertMonotonicInvariants(set, spc); err != nil { return err } if obj, _, err := spc.setsIndexer.Get(set); err != nil { @@ -650,7 +650,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl } } - return assertInvariants(set, spc) + return assertMonotonicInvariants(set, spc) } func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *fakeStatefulPodControl) error { @@ -692,5 +692,5 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr set = obj.(*apps.StatefulSet) } } - return assertInvariants(set, spc) + return assertMonotonicInvariants(set, spc) } diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 88bc8eb102f..198da850da8 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -229,7 +229,7 @@ func isHealthy(pod *v1.Pod) bool { // allowsBurst is true if the alpha burst annotation is set. func allowsBurst(set *apps.StatefulSet) bool { - return set.Annotations[apps.StatefulSetBurstAnnotation] == "true" + return set.Spec.PodManagementPolicy == apps.ParallelPodManagement } // newControllerRef returns an ControllerRef pointing to a given StatefulSet.