Merge pull request #49142 from joelsmith/slowstart

Automatic merge from submit-queue (batch tested with PRs 50602, 51561, 51703, 51748, 49142)

Slow-start batch pod creation of rs, rc, ds, jobs

Prevent too-large replicas from generating enormous numbers
of events by creating only a few pods at a time, then increasing
the batch size when pod creations succeed. Stop creating batches
of pods when any pod creation errors are encountered.

Todo:

- [x] Add automated tests
- [x] Test ds

Fixes https://github.com/kubernetes/kubernetes/issues/49145

**Release note**:
```release-note
controllers backoff better in face of quota denial
```
This commit is contained in:
Kubernetes Submit Queue 2017-09-03 01:12:14 -07:00 committed by GitHub
commit 28857a2f02
13 changed files with 354 additions and 131 deletions

View File

@ -65,6 +65,21 @@ const (
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
// latency/pod at the scale of 3000 pods over 100 nodes.
ExpectationsTimeout = 5 * time.Minute
// When batching pod creates, SlowStartInitialBatchSize is the size of the
// inital batch. The size of each successive batch is twice the size of
// the previous batch. For example, for a value of 1, batch sizes would be
// 1, 2, 4, 8, ... and for a value of 10, batch sizes would be
// 10, 20, 40, 80, ... Setting the value higher means that quota denials
// will result in more doomed API calls and associated event spam. Setting
// the value lower will result in more API call round trip periods for
// large batches.
//
// Given a number of pods to start "N":
// The number of doomed calls per sync once quota is exceeded is given by:
// min(N,SlowStartInitialBatchSize)
// The number of batches is given by:
// 1+floor(log_2(ceil(N/SlowStartInitialBatchSize)))
SlowStartInitialBatchSize = 1
)
var UpdateTaintBackoff = wait.Backoff{
@ -606,11 +621,13 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime
type FakePodControl struct {
sync.Mutex
Templates []v1.PodTemplateSpec
ControllerRefs []metav1.OwnerReference
DeletePodName []string
Patches [][]byte
Err error
Templates []v1.PodTemplateSpec
ControllerRefs []metav1.OwnerReference
DeletePodName []string
Patches [][]byte
Err error
CreateLimit int
CreateCallCount int
}
var _ PodControlInterface = &FakePodControl{}
@ -628,6 +645,10 @@ func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error {
func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
return fmt.Errorf("Not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
}
f.Templates = append(f.Templates, *spec)
if f.Err != nil {
return f.Err
@ -638,6 +659,10 @@ func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec,
func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
return fmt.Errorf("Not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
}
f.Templates = append(f.Templates, *spec)
f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
if f.Err != nil {
@ -649,6 +674,10 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.
func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
return fmt.Errorf("Not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
}
f.Templates = append(f.Templates, *template)
f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
if f.Err != nil {
@ -674,6 +703,8 @@ func (f *FakePodControl) Clear() {
f.Templates = []v1.PodTemplateSpec{}
f.ControllerRefs = []metav1.OwnerReference{}
f.Patches = [][]byte{}
f.CreateLimit = 0
f.CreateCallCount = 0
}
// ByLogging allows custom sorting of pods so the best one can be picked for getting its logs.

View File

@ -53,6 +53,7 @@ go_library(
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -46,6 +46,7 @@ import (
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/integer"
"k8s.io/client-go/util/workqueue"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -904,31 +905,54 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
createWait := sync.WaitGroup{}
createWait.Add(createDiff)
template := util.CreatePodTemplate(ds.Spec.Template, ds.Spec.TemplateGeneration, hash)
for i := 0; i < createDiff; i++ {
go func(ix int) {
defer createWait.Done()
err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, metav1.NewControllerRef(ds, controllerKind))
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()
err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, metav1.NewControllerRef(ds, controllerKind))
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
createWait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
for i := 0; i < skippedPods; i++ {
dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
}
createWait.Wait()
glog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
deleteWait := sync.WaitGroup{}

View File

@ -412,6 +412,28 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
}
}
// Simulate a cluster with 100 nodes, but simulate a limit (like a quota limit)
// of 10 pods, and verify that the ds doesn't make 100 create calls per sync pass
func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
podControl.FakePodControl.CreateLimit = 10
addNodes(manager.nodeStore, 0, podControl.FakePodControl.CreateLimit*10, nil)
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0)
expectedLimit := 0
for pass := uint8(0); expectedLimit <= podControl.FakePodControl.CreateLimit; pass++ {
expectedLimit += controller.SlowStartInitialBatchSize << pass
}
if podControl.FakePodControl.CreateCallCount > expectedLimit {
t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", podControl.FakePodControl.CreateLimit*2, podControl.FakePodControl.CreateCallCount)
}
}
}
func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")

View File

@ -33,6 +33,7 @@ go_library(
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -39,6 +39,7 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/integer"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
@ -652,34 +653,60 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
active += diff
wait := sync.WaitGroup{}
wait.Add(int(diff))
for i := int32(0); i < diff; i++ {
go func() {
defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
defer utilruntime.HandleError(err)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
errorCount := len(errCh)
wait.Add(int(batchSize))
for i := int32(0); i < batchSize; i++ {
go func() {
defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
jm.expectations.CreationObserved(jobKey)
activeLock.Lock()
active--
activeLock.Unlock()
errCh <- err
}
}()
}
wait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := diff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
active -= skippedPods
for i := int32(0); i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
jm.expectations.CreationObserved(jobKey)
activeLock.Lock()
active--
activeLock.Unlock()
errCh <- err
}
}()
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
diff -= batchSize
}
wait.Wait()
}
select {

View File

@ -124,6 +124,7 @@ func TestControllerSyncJob(t *testing.T) {
parallelism int32
completions int32
deleting bool
podLimit int
// pod setup
podControllerError error
@ -141,102 +142,107 @@ func TestControllerSyncJob(t *testing.T) {
expectedComplete bool
}{
"job start": {
2, 5, false,
2, 5, false, 0,
nil, 0, 0, 0, 0,
2, 0, 2, 0, 0, false,
},
"WQ job start": {
2, -1, false,
2, -1, false, 0,
nil, 0, 0, 0, 0,
2, 0, 2, 0, 0, false,
},
"pending pods": {
2, 5, false,
2, 5, false, 0,
nil, 2, 0, 0, 0,
0, 0, 2, 0, 0, false,
},
"correct # of pods": {
2, 5, false,
2, 5, false, 0,
nil, 0, 2, 0, 0,
0, 0, 2, 0, 0, false,
},
"WQ job: correct # of pods": {
2, -1, false,
2, -1, false, 0,
nil, 0, 2, 0, 0,
0, 0, 2, 0, 0, false,
},
"too few active pods": {
2, 5, false,
2, 5, false, 0,
nil, 0, 1, 1, 0,
1, 0, 2, 1, 0, false,
},
"too few active pods with a dynamic job": {
2, -1, false,
2, -1, false, 0,
nil, 0, 1, 0, 0,
1, 0, 2, 0, 0, false,
},
"too few active pods, with controller error": {
2, 5, false,
2, 5, false, 0,
fmt.Errorf("Fake error"), 0, 1, 1, 0,
1, 0, 1, 1, 0, false,
},
"too many active pods": {
2, 5, false,
2, 5, false, 0,
nil, 0, 3, 0, 0,
0, 1, 2, 0, 0, false,
},
"too many active pods, with controller error": {
2, 5, false,
2, 5, false, 0,
fmt.Errorf("Fake error"), 0, 3, 0, 0,
0, 1, 3, 0, 0, false,
},
"failed pod": {
2, 5, false,
2, 5, false, 0,
nil, 0, 1, 1, 1,
1, 0, 2, 1, 1, false,
},
"job finish": {
2, 5, false,
2, 5, false, 0,
nil, 0, 0, 5, 0,
0, 0, 0, 5, 0, true,
},
"WQ job finishing": {
2, -1, false,
2, -1, false, 0,
nil, 0, 1, 1, 0,
0, 0, 1, 1, 0, false,
},
"WQ job all finished": {
2, -1, false,
2, -1, false, 0,
nil, 0, 0, 2, 0,
0, 0, 0, 2, 0, true,
},
"WQ job all finished despite one failure": {
2, -1, false,
2, -1, false, 0,
nil, 0, 0, 1, 1,
0, 0, 0, 1, 1, true,
},
"more active pods than completions": {
2, 5, false,
2, 5, false, 0,
nil, 0, 10, 0, 0,
0, 8, 2, 0, 0, false,
},
"status change": {
2, 5, false,
2, 5, false, 0,
nil, 0, 2, 2, 0,
0, 0, 2, 2, 0, false,
},
"deleting job": {
2, 5, true,
2, 5, true, 0,
nil, 1, 1, 1, 0,
0, 0, 2, 1, 0, false,
},
"limited pods": {
100, 200, false, 10,
nil, 0, 0, 0, 0,
10, 0, 10, 0, 0, false,
},
}
for name, tc := range testCases {
// job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
@ -276,7 +282,7 @@ func TestControllerSyncJob(t *testing.T) {
t.Errorf("%s: Syncing jobs would return error when podController exception", name)
}
} else {
if err != nil {
if err != nil && (tc.podLimit == 0 || fakePodControl.CreateCallCount < tc.podLimit) {
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
}
}
@ -327,6 +333,14 @@ func TestControllerSyncJob(t *testing.T) {
if tc.expectedComplete && !getCondition(actual, batch.JobComplete) {
t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
}
// validate slow start
expectedLimit := 0
for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ {
expectedLimit += controller.SlowStartInitialBatchSize << pass
}
if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit {
t.Errorf("%s: Unexpected number of create calls. Expected <= %d, saw %d\n", name, fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
}
}
}

View File

@ -35,6 +35,7 @@ go_library(
"//vendor/k8s.io/client-go/listers/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -43,6 +43,7 @@ import (
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/integer"
"k8s.io/client-go/util/workqueue"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
@ -450,41 +451,65 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
// beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff)
var wg sync.WaitGroup
wg.Add(diff)
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
for i := 0; i < diff; i++ {
go func() {
defer wg.Done()
var err error
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
for batchSize := integer.IntMin(diff, controller.SlowStartInitialBatchSize); diff > 0; batchSize = integer.IntMin(2*batchSize, diff) {
errorCount := len(errCh)
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
var err error
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
rsc.expectations.CreationObserved(rsKey)
errCh <- err
}
}()
}
wg.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := diff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for replica set %q/%q", skippedPods, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
rsc.expectations.CreationObserved(rsKey)
errCh <- err
}
}()
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
diff -= batchSize
}
wg.Wait()
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas

View File

@ -324,6 +324,33 @@ func TestSyncReplicaSetCreates(t *testing.T) {
validateSyncReplicaSet(t, &fakePodControl, 2, 0, 0)
}
// Tell the rs to create 100 replicas, but simulate a limit (like a quota limit)
// of 10, and verify that the rs doesn't make 100 create calls per sync pass
func TestSyncReplicaSetCreateFailures(t *testing.T) {
fakePodControl := controller.FakePodControl{}
fakePodControl.CreateLimit = 10
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(fakePodControl.CreateLimit*10, labelMap)
client := fake.NewSimpleClientset(rs)
stopCh := make(chan struct{})
defer close(stopCh)
manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas)
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs)
manager.podControl = &fakePodControl
manager.syncReplicaSet(getKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, fakePodControl.CreateLimit, 0, 0)
expectedLimit := 0
for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ {
expectedLimit += controller.SlowStartInitialBatchSize << pass
}
if fakePodControl.CreateCallCount > expectedLimit {
t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
}
}
func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state
fakeHandler := utiltesting.FakeHandler{

View File

@ -32,6 +32,7 @@ go_library(
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -40,6 +40,7 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/integer"
"k8s.io/client-go/util/workqueue"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
@ -446,42 +447,66 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl
errCh := make(chan error, diff)
rm.expectations.ExpectCreations(rcKey, diff)
var wg sync.WaitGroup
wg.Add(diff)
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
for i := 0; i < diff; i++ {
go func() {
defer wg.Done()
var err error
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: rc.Name,
UID: rc.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
for batchSize := integer.IntMin(diff, controller.SlowStartInitialBatchSize); diff > 0; batchSize = integer.IntMin(2*batchSize, diff) {
errorCount := len(errCh)
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
var err error
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: rc.Name,
UID: rc.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.CreationObserved(rcKey)
errCh <- err
utilruntime.HandleError(err)
}
}()
}
wg.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := diff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for controller %q/%q", skippedPods, rc.Namespace, rc.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.CreationObserved(rcKey)
errCh <- err
utilruntime.HandleError(err)
}
}()
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
diff -= batchSize
}
wg.Wait()
select {
case err := <-errCh:

View File

@ -284,6 +284,30 @@ func TestSyncReplicationControllerCreates(t *testing.T) {
validateSyncReplication(t, &fakePodControl, 2, 0, 0)
}
// Tell the controller to create 100 replicas, but simulate a limit (like a quota limit)
// of 10, and verify that the controller doesn't make 100 create calls per sync pass
func TestSyncReplicationControllerCreateFailures(t *testing.T) {
fakePodControl := controller.FakePodControl{}
fakePodControl.CreateLimit = 10
rc := newReplicationController(fakePodControl.CreateLimit * 10)
c := fake.NewSimpleClientset(rc)
manager, _ /*podInformer*/, rcInformer := newReplicationManagerFromClient(c, BurstReplicas)
rcInformer.Informer().GetIndexer().Add(rc)
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, fakePodControl.CreateLimit, 0, 0)
expectedLimit := 0
for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ {
expectedLimit += controller.SlowStartInitialBatchSize << pass
}
if fakePodControl.CreateCallCount > expectedLimit {
t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
}
}
func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// Setup a fake server to listen for requests, and run the rc manager in steady state
fakeHandler := utiltesting.FakeHandler{