Merge pull request #44899 from smarterclayton/burst

Automatic merge from submit-queue (batch tested with PRs 38990, 45781, 46225, 44899, 43663)

Support parallel scaling on StatefulSets

Fixes #41255

```release-note
StatefulSets now include an alpha scaling feature accessible by setting the `spec.podManagementPolicy` field to `Parallel`.  The controller will not wait for pods to be ready before adding the other pods, and will replace deleted pods as needed.  Since parallel scaling creates pods out of order, you cannot depend on predictable membership changes within your set.
```
This commit is contained in:
Kubernetes Submit Queue
2017-05-22 19:07:09 -07:00
committed by GitHub
34 changed files with 1162 additions and 651 deletions

View File

@@ -56,9 +56,11 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",

View File

@@ -50,6 +50,12 @@ type defaultStatefulSetControl struct {
podControl StatefulPodControlInterface
}
// UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
// consistent monotonic update strategy by default - scale up proceeds in ordinal order, no new pod
// is created while any pod is unhealthy, and pods are terminated in descending order. The burst
// strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
@@ -118,6 +124,8 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
return nil
}
monotonic := !allowsBurst(set)
// Examine each replica with respect to its ordinal
for i := range replicas {
// delete and recreate failed pods
@@ -128,23 +136,29 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
}
replicas[i] = newStatefulSetPod(set, i)
}
// If we find a Pod that has not been created we create the Pod immediately and return
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
return ssc.podControl.CreateStatefulPod(set, replicas[i])
if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
return err
}
// if the set does not allow bursting, return immediately
if monotonic {
return nil
}
// pod created, no more work possible for this round
continue
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to Terminate",
set.Name, replicas[i].Name)
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to Terminate", set.Name, replicas[i].Name)
return nil
}
// If we have a Pod that has been created but is not running and ready we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.
if !isRunningAndReady(replicas[i]) {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to be Running and Ready",
set.Name, replicas[i].Name)
if !isRunningAndReady(replicas[i]) && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting for Pod %s to be Running and Ready", set.Name, replicas[i].Name)
return nil
}
// Enforce the StatefulSet invariants
@@ -166,13 +180,18 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
// Note that we do not resurrect Pods in this interval.
if unhealthy > 0 {
if unhealthy > 0 && monotonic {
glog.V(2).Infof("StatefulSet %s is waiting on %d Pods", set.Name, unhealthy)
return nil
}
if target := len(condemned) - 1; target >= 0 {
for target := len(condemned) - 1; target >= 0; target-- {
glog.V(2).Infof("StatefulSet %s terminating Pod %s", set.Name, condemned[target])
return ssc.podControl.DeleteStatefulPod(set, condemned[target])
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
return err
}
if monotonic {
return nil
}
}
return nil
}

View File

@@ -19,8 +19,12 @@ package statefulset
import (
"errors"
"fmt"
"math/rand"
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"testing"
"time"
@@ -28,9 +32,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
@@ -40,24 +46,75 @@ import (
"k8s.io/kubernetes/pkg/controller"
)
func TestDefaultStatefulSetControlCreatesPods(t *testing.T) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
func setupController(client clientset.Interface) (*fakeStatefulPodControl, StatefulSetControlInterface, chan struct{}) {
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
return spc, ssc, stop
}
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
func burst(set *apps.StatefulSet) *apps.StatefulSet {
set.Spec.PodManagementPolicy = apps.ParallelPodManagement
return set
}
func TestStatefulSetControl(t *testing.T) {
simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) }
largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) }
testCases := []struct {
fn func(*testing.T, *apps.StatefulSet, invariantFunc)
obj func() *apps.StatefulSet
}{
{CreatesPods, simpleSetFn},
{ScalesUp, simpleSetFn},
{ScalesDown, simpleSetFn},
{ReplacesPods, largeSetFn},
{RecreatesFailedPod, simpleSetFn},
{SetsInitAnnotation, simpleSetFn},
{CreatePodFailure, simpleSetFn},
{UpdatePodFailure, simpleSetFn},
{UpdateSetStatusFailure, simpleSetFn},
{PodRecreateDeleteFailure, simpleSetFn},
}
for _, testCase := range testCases {
fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name()
if i := strings.LastIndex(fnName, "."); i != -1 {
fnName = fnName[i+1:]
}
t.Run(
fmt.Sprintf("%s/Monotonic", fnName),
func(t *testing.T) {
testCase.fn(t, testCase.obj(), assertMonotonicInvariants)
},
)
t.Run(
fmt.Sprintf("%s/Burst", fnName),
func(t *testing.T) {
set := burst(testCase.obj())
testCase.fn(t, set, assertBurstInvariants)
},
)
}
}
func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
defer close(stop)
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
var err error
@@ -70,28 +127,16 @@ func TestDefaultStatefulSetControlCreatesPods(t *testing.T) {
}
}
func TestStatefulSetControlScaleUp(t *testing.T) {
set := newStatefulSet(3)
func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
spc, ssc, stop := setupController(client)
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
*set.Spec.Replicas = 4
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to scale StatefulSet : %s", err)
}
var err error
@@ -104,28 +149,16 @@ func TestStatefulSetControlScaleUp(t *testing.T) {
}
}
func TestStatefulSetControlScaleDown(t *testing.T) {
set := newStatefulSet(3)
func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
spc, ssc, stop := setupController(client)
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
*set.Spec.Replicas = 0
if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to scale StatefulSet : %s", err)
}
if set.Status.Replicas != 0 {
@@ -133,24 +166,12 @@ func TestStatefulSetControlScaleDown(t *testing.T) {
}
}
func TestStatefulSetControlReplacesPods(t *testing.T) {
set := newStatefulSet(5)
func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
spc, ssc, stop := setupController(client)
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
var err error
@@ -215,12 +236,11 @@ func TestStatefulSetControlReplacesPods(t *testing.T) {
}
}
func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
@@ -232,7 +252,7 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := assertInvariants(set, spc); err != nil {
if err := invariants(set, spc); err != nil {
t.Error(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -244,7 +264,7 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := assertInvariants(set, spc); err != nil {
if err := invariants(set, spc); err != nil {
t.Error(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -256,22 +276,10 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
}
}
func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
set := newStatefulSet(3)
func SetsInitAnnotation(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
spc, ssc, stop := setupController(client)
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
@@ -284,7 +292,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err = assertInvariants(set, spc); err != nil {
if err = invariants(set, spc); err != nil {
t.Error(err)
}
if pods, err = spc.setPodRunning(set, 0); err != nil {
@@ -300,7 +308,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := assertInvariants(set, spc); err != nil {
if err := invariants(set, spc); err != nil {
t.Error(err)
}
if replicas != int(set.Status.Replicas) {
@@ -309,7 +317,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
if pods, err = spc.setPodInitStatus(set, 0, true); err != nil {
t.Error(err)
}
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -321,28 +329,16 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
}
}
func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) {
set := newStatefulSet(3)
func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc, ssc, stop := setupController(client)
defer close(stop)
spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
}
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
var err error
@@ -355,26 +351,14 @@ func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) {
}
}
func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) {
set := newStatefulSet(3)
func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc, ssc, stop := setupController(client)
defer close(stop)
spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
// have to have 1 successful loop first
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var err error
@@ -404,25 +388,13 @@ func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) {
}
}
func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) {
set := newStatefulSet(3)
func testDefaultStatefulSetControlBlocksOnTerminating(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc, ssc, stop := setupController(client)
defer close(stop)
spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var err error
@@ -458,7 +430,7 @@ func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) {
if len(pods) != 2 {
t.Fatalf("Expected 3 pods, got %d", len(pods))
}
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -470,28 +442,16 @@ func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) {
}
}
func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) {
set := newStatefulSet(3)
func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc, ssc, stop := setupController(client)
defer close(stop)
spc.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
}
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
var err error
@@ -504,22 +464,10 @@ func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) {
}
}
func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
set := newStatefulSet(3)
func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
spc, ssc, stop := setupController(client)
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
@@ -532,7 +480,7 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := assertInvariants(set, spc); err != nil {
if err := invariants(set, spc); err != nil {
t.Error(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -545,13 +493,13 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSet failed to %s", err)
}
if err := assertInvariants(set, spc); err != nil {
if err := invariants(set, spc); err != nil {
t.Error(err)
}
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := assertInvariants(set, spc); err != nil {
if err := invariants(set, spc); err != nil {
t.Error(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -564,23 +512,13 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
}
func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
invariants := assertMonotonicInvariants
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
spc, ssc, stop := setupController(client)
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
var err error
@@ -590,10 +528,10 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
}
*set.Spec.Replicas = 0
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleDownStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
}
if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil {
if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn down StatefulSet %s", err)
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -672,6 +610,14 @@ func (spc *fakeStatefulPodControl) SetUpdateStatefulSetStatusError(err error, af
spc.updateStatusTracker.after = after
}
func copyPod(pod *v1.Pod) *v1.Pod {
obj, err := api.Scheme.Copy(pod)
if err != nil {
panic(err)
}
return obj.(*v1.Pod)
}
func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
@@ -685,7 +631,7 @@ func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal]
pod := copyPod(pods[ordinal])
pod.Status.Phase = v1.PodPending
fakeResourceVersion(pod)
spc.podsIndexer.Update(pod)
@@ -705,7 +651,7 @@ func (spc *fakeStatefulPodControl) setPodRunning(set *apps.StatefulSet, ordinal
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal]
pod := copyPod(pods[ordinal])
pod.Status.Phase = v1.PodRunning
fakeResourceVersion(pod)
spc.podsIndexer.Update(pod)
@@ -725,7 +671,7 @@ func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal in
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal]
pod := copyPod(pods[ordinal])
condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
podutil.UpdatePodCondition(&pod.Status, &condition)
fakeResourceVersion(pod)
@@ -746,7 +692,7 @@ func (spc *fakeStatefulPodControl) setPodInitStatus(set *apps.StatefulSet, ordin
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal]
pod := copyPod(pods[ordinal])
if init {
pod.Annotations[apps.StatefulSetInitAnnotation] = "true"
} else {
@@ -850,7 +796,7 @@ func (spc *fakeStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet
var _ StatefulPodControlInterface = &fakeStatefulPodControl{}
func assertInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
func assertMonotonicInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return err
@@ -862,31 +808,61 @@ func assertInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
sort.Sort(ascendingOrdinal(pods))
for ord := 0; ord < len(pods); ord++ {
if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) {
return fmt.Errorf("Predecessor %s is Running and Ready while %s is not",
pods[ord-1].Name,
pods[ord].Name)
return fmt.Errorf("Successor %s is Running and Ready while %s is not", pods[ord].Name, pods[ord-1].Name)
}
if getOrdinal(pods[ord]) != ord {
return fmt.Errorf("Pods %s deployed in the wrong order",
pods[ord].Name)
return fmt.Errorf("pods %s deployed in the wrong order", pods[ord].Name)
}
if !storageMatches(set, pods[ord]) {
return fmt.Errorf("Pods %s does not match the storage specification of StatefulSet %s ",
pods[ord].
Name, set.Name)
} else {
for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
if claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name); err != nil {
return err
} else if claim == nil {
return fmt.Errorf("claim %s for Pod %s was not created",
claim.Name,
pods[ord].Name)
}
return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
}
for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
if err != nil {
return err
}
if claim == nil {
return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
}
}
if !identityMatches(set, pods[ord]) {
return fmt.Errorf("Pods %s does not match the identity specification of StatefulSet %s ",
return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
}
}
return nil
}
func assertBurstInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return err
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
return err
}
sort.Sort(ascendingOrdinal(pods))
for ord := 0; ord < len(pods); ord++ {
if !storageMatches(set, pods[ord]) {
return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
}
for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
if err != nil {
return err
}
if claim == nil {
return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
}
}
if !identityMatches(set, pods[ord]) {
return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ",
pods[ord].Name,
set.Name)
}
@@ -898,14 +874,15 @@ func fakeResourceVersion(object interface{}) {
obj, isObj := object.(metav1.Object)
if !isObj {
return
} else if version := obj.GetResourceVersion(); version == "" {
}
if version := obj.GetResourceVersion(); version == "" {
obj.SetResourceVersion("1")
} else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil {
obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10))
}
}
func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl) error {
func scaleUpStatefulSetControl(t *testing.T, set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return err
@@ -915,47 +892,59 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInte
if err != nil {
return err
}
if ord := len(pods) - 1; ord >= 0 {
ord := len(pods) - 1
if pods, err = spc.setPodPending(set, ord); err != nil {
return err
}
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if pods, err = spc.setPodRunning(set, ord); err != nil {
return err
}
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if pods, err = spc.setPodReady(set, ord); err != nil {
return err
sort.Sort(ascendingOrdinal(pods))
// ensure all pods are valid (have a phase)
initialized := false
for ord, pod := range pods {
if pod.Status.Phase == "" {
t.Logf("found pod %s pending", pod.Name)
if pods, err = spc.setPodPending(set, ord); err != nil {
return err
}
break
}
}
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
if initialized {
continue
}
// select one of the pods and move it forward in status
if len(pods) > 0 {
ord := int(rand.Int63n(int64(len(pods))))
pod := pods[ord]
switch pod.Status.Phase {
case v1.PodPending:
t.Logf("set pod %s running", pod.Name)
if pods, err = spc.setPodRunning(set, ord); err != nil {
return err
}
case v1.PodRunning:
t.Logf("set pod %s ready", pod.Name)
if pods, err = spc.setPodReady(set, ord); err != nil {
return err
}
default:
continue
}
}
// run the controller once and check invariants
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if err := assertInvariants(set, spc); err != nil {
if err := invariants(set, spc); err != nil {
return err
}
}
return assertInvariants(set, spc)
return invariants(set, spc)
}
func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl) error {
func scaleDownStatefulSetControl(t *testing.T, set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return err
@@ -965,6 +954,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if err != nil {
return err
}
sort.Sort(ascendingOrdinal(pods))
if ordinal := len(pods) - 1; ordinal >= 0 {
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
return err
@@ -997,9 +987,9 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if err != nil {
return err
}
if err := assertInvariants(set, spc); err != nil {
if err := invariants(set, spc); err != nil {
return err
}
}
return assertInvariants(set, spc)
return invariants(set, spc)
}

View File

@@ -640,7 +640,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl
pod = getPodAtOrdinal(pods, ord)
ssc.updatePod(&prev, pod)
fakeWorker(ssc)
if err := assertInvariants(set, spc); err != nil {
if err := assertMonotonicInvariants(set, spc); err != nil {
return err
}
if obj, _, err := spc.setsIndexer.Get(set); err != nil {
@@ -650,7 +650,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl
}
}
return assertInvariants(set, spc)
return assertMonotonicInvariants(set, spc)
}
func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *fakeStatefulPodControl) error {
@@ -692,5 +692,5 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
set = obj.(*apps.StatefulSet)
}
}
return assertInvariants(set, spc)
return assertMonotonicInvariants(set, spc)
}

View File

@@ -227,6 +227,11 @@ func isHealthy(pod *v1.Pod) bool {
return isRunningAndReady(pod) && !isTerminating(pod)
}
// allowsBurst is true if the alpha burst annotation is set.
func allowsBurst(set *apps.StatefulSet) bool {
return set.Spec.PodManagementPolicy == apps.ParallelPodManagement
}
// newControllerRef returns an ControllerRef pointing to a given StatefulSet.
func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference {
blockOwnerDeletion := true