From 97e07e5b52d310ac24d1d46572a4435e694a48f8 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 31 Jul 2017 18:07:46 -0700 Subject: [PATCH] Let controllers ignore initialization timeout error when creating a pod. --- pkg/controller/controller_utils.go | 2 +- pkg/controller/daemon/daemon_controller.go | 13 +- pkg/controller/job/job_controller.go | 13 +- pkg/controller/replicaset/replica_set.go | 10 ++ .../replication/replication_controller.go | 10 ++ .../pkg/registry/generic/registry/store.go | 5 +- test/e2e/apimachinery/initializers.go | 126 ++++++++++++++++-- 7 files changed, 165 insertions(+), 14 deletions(-) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 4a6d6863dab..bfb35279d24 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -580,7 +580,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT } if newPod, err := r.KubeClient.Core().Pods(namespace).Create(pod); err != nil { r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err) - return fmt.Errorf("unable to create pods: %v", err) + return err } else { accessor, err := meta.Accessor(object) if err != nil { diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 61919a412e0..17a41364516 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -816,7 +816,18 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet for i := 0; i < createDiff; i++ { go func(ix int) { defer createWait.Done() - if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)); err != nil { + err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)) + if err != nil && errors.IsTimeout(err) { + // Pod is created but its initialization has timed out. + // If the initialization is successful eventually, the + // controller will observe the creation via the informer. + // If the initialization fails, or if the pod keeps + // uninitialized for a long time, the informer will not + // receive any update, and the controller will create a new + // pod when the expectation expires. + return + } + if err != nil { glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) errCh <- err diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index f59eb639afe..50e4f7ad151 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -624,7 +624,18 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b for i := int32(0); i < diff; i++ { go func() { defer wait.Done() - if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil { + err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)) + if err != nil && errors.IsTimeout(err) { + // Pod is created but its initialization has timed out. + // If the initialization is successful eventually, the + // controller will observe the creation via the informer. + // If the initialization fails, or if the pod keeps + // uninitialized for a long time, the informer will not + // receive any update, and the controller will create a new + // pod when the expectation expires. + return + } + if err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index cca10f9c71c..97b27b7040c 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -466,6 +466,16 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte Controller: boolPtr(true), } err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) + if err != nil && errors.IsTimeout(err) { + // Pod is created but its initialization has timed out. + // If the initialization is successful eventually, the + // controller will observe the creation via the informer. + // If the initialization fails, or if the pod keeps + // uninitialized for a long time, the informer will not + // receive any update, and the controller will create a new + // pod when the expectation expires. + return + } if err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name) diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 72a2673dff4..c50fc790b0b 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -462,6 +462,16 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl Controller: boolPtr(true), } err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) + if err != nil && errors.IsTimeout(err) { + // Pod is created but its initialization has timed out. + // If the initialization is successful eventually, the + // controller will observe the creation via the informer. + // If the initialization fails, or if the pod keeps + // uninitialized for a long time, the informer will not + // receive any update, and the controller will create a new + // pod when the expectation expires. + return + } if err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index b5705f6fdc0..2e9801dc01d 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -362,8 +362,9 @@ func (e *Store) WaitForInitialized(ctx genericapirequest.Context, obj runtime.Ob select { case event, ok := <-ch: if !ok { - // TODO: should we just expose the partially initialized object? - return nil, kubeerr.NewServerTimeout(e.QualifiedResource, "create", 0) + msg := fmt.Sprintf("server has timed out waiting for the initialization of %s %s", + e.QualifiedResource.String(), accessor.GetName()) + return nil, kubeerr.NewTimeoutError(msg, 0) } switch event.Type { case watch.Deleted: diff --git a/test/e2e/apimachinery/initializers.go b/test/e2e/apimachinery/initializers.go index f81a089bdc4..c9b249461ff 100644 --- a/test/e2e/apimachinery/initializers.go +++ b/test/e2e/apimachinery/initializers.go @@ -26,8 +26,10 @@ import ( "k8s.io/api/admissionregistration/v1alpha1" "k8s.io/api/core/v1" + "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" clientretry "k8s.io/kubernetes/pkg/client/retry" @@ -133,15 +135,7 @@ var _ = SIGDescribe("Initializers", func() { Expect(err).NotTo(HaveOccurred()) // we must remove the initializer when the test is complete and ensure no pods are pending for that initializer - defer func() { - if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) { - framework.Logf("got error on deleting %s", initializerConfigName) - } - // poller configuration is 1 second, wait at least that long - time.Sleep(3 * time.Second) - // clear our initializer from anyone who got it - removeInitializersFromAllPods(c, initializerName) - }() + defer cleanupInitializer(c, initializerConfigName, initializerName) // poller configuration is 1 second, wait at least that long time.Sleep(3 * time.Second) @@ -207,6 +201,67 @@ var _ = SIGDescribe("Initializers", func() { Expect(pod.Initializers).To(BeNil()) Expect(pod.Annotations[v1.MirrorPodAnnotationKey]).To(Equal("true")) }) + + It("don't cause replicaset controller creating extra pods if the initializer is not handled [Serial]", func() { + ns := f.Namespace.Name + c := f.ClientSet + + podName := "uninitialized-pod" + framework.Logf("Creating pod %s", podName) + + // create and register an initializer, without setting up a controller to handle it. + initializerName := "pod.test.e2e.kubernetes.io" + initializerConfigName := "e2e-test-initializer" + _, err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Create(&v1alpha1.InitializerConfiguration{ + ObjectMeta: metav1.ObjectMeta{Name: initializerConfigName}, + Initializers: []v1alpha1.Initializer{ + { + Name: initializerName, + Rules: []v1alpha1.Rule{ + {APIGroups: []string{""}, APIVersions: []string{"*"}, Resources: []string{"pods"}}, + }, + }, + }, + }) + if errors.IsNotFound(err) { + framework.Skipf("dynamic configuration of initializers requires the alpha admissionregistration.k8s.io group to be enabled") + } + Expect(err).NotTo(HaveOccurred()) + + // we must remove the initializer when the test is complete and ensure no pods are pending for that initializer + defer cleanupInitializer(c, initializerConfigName, initializerName) + + // poller configuration is 1 second, wait at least that long + time.Sleep(3 * time.Second) + + // create a replicaset + persistedRS, err := c.ExtensionsV1beta1().ReplicaSets(ns).Create(newReplicaset()) + Expect(err).NotTo(HaveOccurred()) + // wait for replicaset controller to confirm that it has handled the creation + err = waitForRSObservedGeneration(c, persistedRS.Namespace, persistedRS.Name, persistedRS.Generation) + Expect(err).NotTo(HaveOccurred()) + + // update the replicaset spec to trigger a resync + patch := []byte(`{"spec":{"minReadySeconds":5}}`) + persistedRS, err = c.ExtensionsV1beta1().ReplicaSets(ns).Patch(persistedRS.Name, types.StrategicMergePatchType, patch) + Expect(err).NotTo(HaveOccurred()) + + // wait for replicaset controller to confirm that it has handle the spec update + err = waitForRSObservedGeneration(c, persistedRS.Namespace, persistedRS.Name, persistedRS.Generation) + Expect(err).NotTo(HaveOccurred()) + + // verify that the replicaset controller doesn't create extra pod + selector, err := metav1.LabelSelectorAsSelector(persistedRS.Spec.Selector) + Expect(err).NotTo(HaveOccurred()) + + listOptions := metav1.ListOptions{ + LabelSelector: selector.String(), + IncludeUninitialized: true, + } + pods, err := c.Core().Pods(ns).List(listOptions) + Expect(err).NotTo(HaveOccurred()) + Expect(len(pods.Items)).Should(Equal(1)) + }) }) func newUninitializedPod(podName string) *v1.Pod { @@ -217,6 +272,34 @@ func newUninitializedPod(podName string) *v1.Pod { return pod } +func newReplicaset() *v1beta1.ReplicaSet { + name := "initializer-test-replicaset" + replicas := int32(1) + labels := map[string]string{"initializer-test": "single-replicaset"} + return &v1beta1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1beta1.ReplicaSetSpec{ + Replicas: &replicas, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &zero, + Containers: []v1.Container{ + { + Name: name + "-container", + Image: "gcr.io/google_containers/porter:4524579c0eb935c056c8e75563b4e1eda31587e0", + }, + }, + }, + }, + }, + } +} + func newInitPod(podName string) *v1.Pod { containerName := fmt.Sprintf("%s-container", podName) port := 8080 @@ -283,3 +366,28 @@ func removeInitializersFromAllPods(c clientset.Interface, initializerName string } } } + +// remove the initializerConfig, and remove the initializer from all pods +func cleanupInitializer(c clientset.Interface, initializerConfigName, initializerName string) { + if err := c.AdmissionregistrationV1alpha1().InitializerConfigurations().Delete(initializerConfigName, nil); err != nil && !errors.IsNotFound(err) { + framework.Logf("got error on deleting %s", initializerConfigName) + } + // poller configuration is 1 second, wait at least that long + time.Sleep(3 * time.Second) + // clear our initializer from anyone who got it + removeInitializersFromAllPods(c, initializerName) +} + +// waits till the RS status.observedGeneration matches metadata.generation. +func waitForRSObservedGeneration(c clientset.Interface, ns, name string, generation int64) error { + return wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) { + rs, err := c.Extensions().ReplicaSets(ns).Get(name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if generation > rs.Status.ObservedGeneration { + return false, nil + } + return true, nil + }) +}