From 267dae6435e105c3553096c97458717fb1dc5da6 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Mon, 5 Dec 2016 02:14:46 +0100 Subject: [PATCH 1/2] controller: requeue replica sets for availability checks --- pkg/controller/replicaset/replica_set.go | 28 +++++++++++++++++++ .../replication/replication_controller.go | 28 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 0bbb9491f36..12efcbaedaf 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -343,8 +343,19 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { } } + changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) if curRS := rsc.getPodReplicaSet(curPod); curRS != nil { rsc.enqueueReplicaSet(curRS) + // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in + // the Pod status which in turn will trigger a requeue of the owning replica set thus + // having its status updated with the newly available replica. For now, we can fake the + // update by resyncing the controller MinReadySeconds after the it is requeued because + // a Pod transitioned to Ready. + // Note that this still suffers from #29229, we are just moving the problem one level + // "closer" to kubelet (from the deployment to the replica set controller). + if changedToReady && curRS.Spec.MinReadySeconds > 0 { + rsc.enqueueReplicaSetAfter(curRS, time.Duration(curRS.Spec.MinReadySeconds)*time.Second) + } } } @@ -398,6 +409,23 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { rsc.queue.Add(key) } +// obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item. +func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + return + } + + // TODO: Handle overlapping replica sets better. Either disallow them at admission time or + // deterministically avoid syncing replica sets that fight over pods. Currently, we only + // ensure that the same replica set is synced for a given pod. When we periodically relist + // all replica sets there will still be some replica instability. One way to handle this is + // by querying the store for all replica sets that this replica set overlaps, as well as all + // replica sets that overlap this ReplicaSet, and sorting them. + rsc.queue.AddAfter(key, after) +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rsc *ReplicaSetController) worker() { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index bcb5192d3d7..9c8a5f07f57 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -400,8 +400,19 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { } } + changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) if curRC := rm.getPodController(curPod); curRC != nil { rm.enqueueController(curRC) + // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in + // the Pod status which in turn will trigger a requeue of the owning replication controller + // thus having its status updated with the newly available replica. For now, we can fake the + // update by resyncing the controller MinReadySeconds after the it is requeued because a Pod + // transitioned to Ready. + // Note that this still suffers from #29229, we are just moving the problem one level + // "closer" to kubelet (from the deployment to the replication controller manager). + if changedToReady && curRC.Spec.MinReadySeconds > 0 { + rm.enqueueControllerAfter(curRC, time.Duration(curRC.Spec.MinReadySeconds)*time.Second) + } } } @@ -455,6 +466,23 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) { rm.queue.Add(key) } +// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item. +func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + + // TODO: Handle overlapping controllers better. Either disallow them at admission time or + // deterministically avoid syncing controllers that fight over pods. Currently, we only + // ensure that the same controller is synced for a given pod. When we periodically relist + // all controllers there will still be some replica instability. One way to handle this is + // by querying the store for all controllers that this rc overlaps, as well as all + // controllers that overlap this rc, and sorting them. + rm.queue.AddAfter(key, after) +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rm *ReplicationManager) worker() { From 4dfe2a234817ed8c08f2a8573b218c488a66b572 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Mon, 5 Dec 2016 02:31:13 +0100 Subject: [PATCH 2/2] test: update e2e rollover test to use minReadySeconds --- test/e2e/deployment.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 4c3efe7d70f..a532cca0ac1 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -557,25 +557,29 @@ func testRolloverDeployment(f *framework.Framework) { deploymentReplicas := int32(4) deploymentImage := "gcr.io/google_samples/gb-redisslave:nonexistent" deploymentStrategyType := extensions.RollingUpdateDeploymentStrategyType - framework.Logf("Creating deployment %s", deploymentName) + framework.Logf("Creating deployment %q", deploymentName) newDeployment := newDeployment(deploymentName, deploymentReplicas, deploymentPodLabels, deploymentImageName, deploymentImage, deploymentStrategyType, nil) newDeployment.Spec.Strategy.RollingUpdate = &extensions.RollingUpdateDeployment{ MaxUnavailable: func(i int) *intstr.IntOrString { x := intstr.FromInt(i); return &x }(1), MaxSurge: func(i int) *intstr.IntOrString { x := intstr.FromInt(i); return &x }(1), } + newDeployment.Spec.MinReadySeconds = int32(10) _, err = c.Extensions().Deployments(ns).Create(newDeployment) Expect(err).NotTo(HaveOccurred()) // Verify that the pods were scaled up and down as expected. deployment, err := c.Extensions().Deployments(ns).Get(deploymentName) Expect(err).NotTo(HaveOccurred()) + framework.Logf("Make sure deployment %q performs scaling operations", deploymentName) // Make sure the deployment starts to scale up and down replica sets by checking if its updated replicas >= 1 err = framework.WaitForDeploymentUpdatedReplicasLTE(c, ns, deploymentName, 1, deployment.Generation) // Check if it's updated to revision 1 correctly + framework.Logf("Check revision of new replica set for deployment %q", deploymentName) _, newRS := checkDeploymentRevision(c, ns, deploymentName, "1", deploymentImageName, deploymentImage) // Before the deployment finishes, update the deployment to rollover the above 2 ReplicaSets and bring up redis pods. Expect(*newRS.Spec.Replicas).Should(BeNumerically("<", deploymentReplicas)) + framework.Logf("Make sure deployment %q with new image", deploymentName) updatedDeploymentImageName, updatedDeploymentImage := redisImageName, redisImage deployment, err = framework.UpdateDeploymentWithRetries(c, ns, newDeployment.Name, func(update *extensions.Deployment) { update.Spec.Template.Spec.Containers[0].Name = updatedDeploymentImageName @@ -584,13 +588,16 @@ func testRolloverDeployment(f *framework.Framework) { Expect(err).NotTo(HaveOccurred()) // Use observedGeneration to determine if the controller noticed the pod template update. + framework.Logf("Wait deployment %q to be observed by the deployment controller", deploymentName) err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation) Expect(err).NotTo(HaveOccurred()) // Wait for it to be updated to revision 2 + framework.Logf("Wait for revision update of deployment %q to 2", deploymentName) err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "2", updatedDeploymentImage) Expect(err).NotTo(HaveOccurred()) + framework.Logf("Make sure deployment %q is complete", deploymentName) err = framework.WaitForDeploymentStatus(c, deployment) Expect(err).NotTo(HaveOccurred()) }