Merge pull request #49915 from caesarxuchao/controller-ignore-initialize-timeout

Automatic merge from submit-queue (batch tested with PRs 49855, 49915)

Let controllers ignore initialization timeout when creating pods

Partially address https://github.com/kubernetes/kubernetes/issues/48893#issuecomment-318540129.

This only updates the controllers that create pods with `GenerateName`.

The controllers ignore the timeout error when creating the pods, depending on how the initialization progress:
* If the initialization is successful in less than 5 mins, the controller will observe the creation via the informer. All is good.
* If the initialization fails, server will delete the pod, but the controller won't receive any event. The controller will not create new pod until the Creation expectation expires in 5 min.
* If the initialization takes too long (> 5 mins), the Creation expectation expires and the controller will create extra pods.

I'll send follow-up PRs to fix the latter two cases, e.g., by refactoring the sharedInformer.
This commit is contained in:
Kubernetes Submit Queue 2017-08-05 19:07:53 -07:00 committed by GitHub
commit f75f49e7a0
7 changed files with 165 additions and 14 deletions

View File

@ -580,7 +580,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT
}
if newPod, err := r.KubeClient.Core().Pods(namespace).Create(pod); err != nil {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
return fmt.Errorf("unable to create pods: %v", err)
return err
} else {
accessor, err := meta.Accessor(object)
if err != nil {

View File

@ -816,7 +816,18 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet
for i := 0; i < createDiff; i++ {
go func(ix int) {
defer createWait.Done()
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)); err != nil {
err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds))
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
errCh <- err

View File

@ -624,7 +624,18 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
for i := int32(0); i < diff; i++ {
go func() {
defer wait.Done()
if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil {
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job))
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)

View File

@ -466,6 +466,16 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
Controller: boolPtr(true),
}
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)

View File

@ -462,6 +462,16 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl
Controller: boolPtr(true),
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)

View File

@ -362,8 +362,9 @@ func (e *Store) WaitForInitialized(ctx genericapirequest.Context, obj runtime.Ob
select {
case event, ok := <-ch:
if !ok {
// TODO: should we just expose the partially initialized object?
return nil, kubeerr.NewServerTimeout(e.QualifiedResource, "create", 0)
msg := fmt.Sprintf("server has timed out waiting for the initialization of %s %s",
e.QualifiedResource.String(), accessor.GetName())
return nil, kubeerr.NewTimeoutError(msg, 0)
}
switch event.Type {
case watch.Deleted:

View File

@ -26,8 +26,10 @@ import (
"k8s.io/api/admissionregistration/v1alpha1"
"k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
clientretry "k8s.io/kubernetes/pkg/client/retry"
@ -133,15 +135,7 @@ var _ = SIGDescribe("Initializers", func() {
Expect(err).NotTo(HaveOccurred())
// we must remove the initializer when the test is complete and ensure no pods are pending for that initializer
defer func() {
if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) {
framework.Logf("got error on deleting %s", initializerConfigName)
}
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
// clear our initializer from anyone who got it
removeInitializersFromAllPods(c, initializerName)
}()
defer cleanupInitializer(c, initializerConfigName, initializerName)
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
@ -207,6 +201,67 @@ var _ = SIGDescribe("Initializers", func() {
Expect(pod.Initializers).To(BeNil())
Expect(pod.Annotations[v1.MirrorPodAnnotationKey]).To(Equal("true"))
})
It("don't cause replicaset controller creating extra pods if the initializer is not handled [Serial]", func() {
ns := f.Namespace.Name
c := f.ClientSet
podName := "uninitialized-pod"
framework.Logf("Creating pod %s", podName)
// create and register an initializer, without setting up a controller to handle it.
initializerName := "pod.test.e2e.kubernetes.io"
initializerConfigName := "e2e-test-initializer"
_, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(&v1alpha1.InitializerConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName},
Initializers: []v1alpha1.Initializer{
{
Name: initializerName,
Rules: []v1alpha1.Rule{
{APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}},
},
},
},
})
if errors.IsNotFound(err) {
framework.Skipf("dynamic configuration of initializers requires the alpha admissionregistration.k8s.io group to be enabled")
}
Expect(err).NotTo(HaveOccurred())
// we must remove the initializer when the test is complete and ensure no pods are pending for that initializer
defer cleanupInitializer(c, initializerConfigName, initializerName)
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
// create a replicaset
persistedRS, err := c.ExtensionsV1beta1().ReplicaSets(ns).Create(newReplicaset())
Expect(err).NotTo(HaveOccurred())
// wait for replicaset controller to confirm that it has handled the creation
err = waitForRSObservedGeneration(c, persistedRS.Namespace, persistedRS.Name, persistedRS.Generation)
Expect(err).NotTo(HaveOccurred())
// update the replicaset spec to trigger a resync
patch := []byte(`{"spec":{"minReadySeconds":5}}`)
persistedRS, err = c.ExtensionsV1beta1().ReplicaSets(ns).Patch(persistedRS.Name, types.StrategicMergePatchType, patch)
Expect(err).NotTo(HaveOccurred())
// wait for replicaset controller to confirm that it has handle the spec update
err = waitForRSObservedGeneration(c, persistedRS.Namespace, persistedRS.Name, persistedRS.Generation)
Expect(err).NotTo(HaveOccurred())
// verify that the replicaset controller doesn't create extra pod
selector, err := metav1.LabelSelectorAsSelector(persistedRS.Spec.Selector)
Expect(err).NotTo(HaveOccurred())
listOptions := metav1.ListOptions{
LabelSelector: selector.String(),
IncludeUninitialized: true,
}
pods, err := c.Core().Pods(ns).List(listOptions)
Expect(err).NotTo(HaveOccurred())
Expect(len(pods.Items)).Should(Equal(1))
})
})
func newUninitializedPod(podName string) *v1.Pod {
@ -217,6 +272,34 @@ func newUninitializedPod(podName string) *v1.Pod {
return pod
}
func newReplicaset() *v1beta1.ReplicaSet {
name := "initializer-test-replicaset"
replicas := int32(1)
labels := map[string]string{"initializer-test": "single-replicaset"}
return &v1beta1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1beta1.ReplicaSetSpec{
Replicas: &replicas,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &zero,
Containers: []v1.Container{
{
Name: name + "-container",
Image: "gcr.io/google_containers/porter:4524579c0eb935c056c8e75563b4e1eda31587e0",
},
},
},
},
},
}
}
func newInitPod(podName string) *v1.Pod {
containerName := fmt.Sprintf("%s-container", podName)
port := 8080
@ -283,3 +366,28 @@ func removeInitializersFromAllPods(c clientset.Interface, initializerName string
}
}
}
// remove the initializerConfig, and remove the initializer from all pods
func cleanupInitializer(c clientset.Interface, initializerConfigName, initializerName string) {
if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) {
framework.Logf("got error on deleting %s", initializerConfigName)
}
// poller configuration is 1 second, wait at least that long
time.Sleep(3 * time.Second)
// clear our initializer from anyone who got it
removeInitializersFromAllPods(c, initializerName)
}
// waits till the RS status.observedGeneration matches metadata.generation.
func waitForRSObservedGeneration(c clientset.Interface, ns, name string, generation int64) error {
return wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
rs, err := c.Extensions().ReplicaSets(ns).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if generation > rs.Status.ObservedGeneration {
return false, nil
}
return true, nil
})
}