mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 02:34:03 +00:00
Merge pull request #38076 from kargakis/requeue-replica-sets-for-availability-checks
Automatic merge from submit-queue (batch tested with PRs 38076, 38137, 36882, 37634, 37558) Requeue replica sets for availability checks See https://github.com/kubernetes/kubernetes/issues/35355#issuecomment-264746268 for more details @kubernetes/deployment @smarterclayton
This commit is contained in:
commit
f587c7a49f
@ -343,8 +343,19 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod)
|
||||||
if curRS := rsc.getPodReplicaSet(curPod); curRS != nil {
|
if curRS := rsc.getPodReplicaSet(curPod); curRS != nil {
|
||||||
rsc.enqueueReplicaSet(curRS)
|
rsc.enqueueReplicaSet(curRS)
|
||||||
|
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
|
||||||
|
// the Pod status which in turn will trigger a requeue of the owning replica set thus
|
||||||
|
// having its status updated with the newly available replica. For now, we can fake the
|
||||||
|
// update by resyncing the controller MinReadySeconds after the it is requeued because
|
||||||
|
// a Pod transitioned to Ready.
|
||||||
|
// Note that this still suffers from #29229, we are just moving the problem one level
|
||||||
|
// "closer" to kubelet (from the deployment to the replica set controller).
|
||||||
|
if changedToReady && curRS.Spec.MinReadySeconds > 0 {
|
||||||
|
rsc.enqueueReplicaSetAfter(curRS, time.Duration(curRS.Spec.MinReadySeconds)*time.Second)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -398,6 +409,23 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
|
|||||||
rsc.queue.Add(key)
|
rsc.queue.Add(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item.
|
||||||
|
func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) {
|
||||||
|
key, err := controller.KeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Handle overlapping replica sets better. Either disallow them at admission time or
|
||||||
|
// deterministically avoid syncing replica sets that fight over pods. Currently, we only
|
||||||
|
// ensure that the same replica set is synced for a given pod. When we periodically relist
|
||||||
|
// all replica sets there will still be some replica instability. One way to handle this is
|
||||||
|
// by querying the store for all replica sets that this replica set overlaps, as well as all
|
||||||
|
// replica sets that overlap this ReplicaSet, and sorting them.
|
||||||
|
rsc.queue.AddAfter(key, after)
|
||||||
|
}
|
||||||
|
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||||
func (rsc *ReplicaSetController) worker() {
|
func (rsc *ReplicaSetController) worker() {
|
||||||
|
@ -400,8 +400,19 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod)
|
||||||
if curRC := rm.getPodController(curPod); curRC != nil {
|
if curRC := rm.getPodController(curPod); curRC != nil {
|
||||||
rm.enqueueController(curRC)
|
rm.enqueueController(curRC)
|
||||||
|
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
|
||||||
|
// the Pod status which in turn will trigger a requeue of the owning replication controller
|
||||||
|
// thus having its status updated with the newly available replica. For now, we can fake the
|
||||||
|
// update by resyncing the controller MinReadySeconds after the it is requeued because a Pod
|
||||||
|
// transitioned to Ready.
|
||||||
|
// Note that this still suffers from #29229, we are just moving the problem one level
|
||||||
|
// "closer" to kubelet (from the deployment to the replication controller manager).
|
||||||
|
if changedToReady && curRC.Spec.MinReadySeconds > 0 {
|
||||||
|
rm.enqueueControllerAfter(curRC, time.Duration(curRC.Spec.MinReadySeconds)*time.Second)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -455,6 +466,23 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) {
|
|||||||
rm.queue.Add(key)
|
rm.queue.Add(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item.
|
||||||
|
func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) {
|
||||||
|
key, err := controller.KeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
|
||||||
|
// deterministically avoid syncing controllers that fight over pods. Currently, we only
|
||||||
|
// ensure that the same controller is synced for a given pod. When we periodically relist
|
||||||
|
// all controllers there will still be some replica instability. One way to handle this is
|
||||||
|
// by querying the store for all controllers that this rc overlaps, as well as all
|
||||||
|
// controllers that overlap this rc, and sorting them.
|
||||||
|
rm.queue.AddAfter(key, after)
|
||||||
|
}
|
||||||
|
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||||
func (rm *ReplicationManager) worker() {
|
func (rm *ReplicationManager) worker() {
|
||||||
|
@ -558,25 +558,29 @@ func testRolloverDeployment(f *framework.Framework) {
|
|||||||
deploymentReplicas := int32(4)
|
deploymentReplicas := int32(4)
|
||||||
deploymentImage := "gcr.io/google_samples/gb-redisslave:nonexistent"
|
deploymentImage := "gcr.io/google_samples/gb-redisslave:nonexistent"
|
||||||
deploymentStrategyType := extensions.RollingUpdateDeploymentStrategyType
|
deploymentStrategyType := extensions.RollingUpdateDeploymentStrategyType
|
||||||
framework.Logf("Creating deployment %s", deploymentName)
|
framework.Logf("Creating deployment %q", deploymentName)
|
||||||
newDeployment := newDeployment(deploymentName, deploymentReplicas, deploymentPodLabels, deploymentImageName, deploymentImage, deploymentStrategyType, nil)
|
newDeployment := newDeployment(deploymentName, deploymentReplicas, deploymentPodLabels, deploymentImageName, deploymentImage, deploymentStrategyType, nil)
|
||||||
newDeployment.Spec.Strategy.RollingUpdate = &extensions.RollingUpdateDeployment{
|
newDeployment.Spec.Strategy.RollingUpdate = &extensions.RollingUpdateDeployment{
|
||||||
MaxUnavailable: func(i int) *intstr.IntOrString { x := intstr.FromInt(i); return &x }(1),
|
MaxUnavailable: func(i int) *intstr.IntOrString { x := intstr.FromInt(i); return &x }(1),
|
||||||
MaxSurge: func(i int) *intstr.IntOrString { x := intstr.FromInt(i); return &x }(1),
|
MaxSurge: func(i int) *intstr.IntOrString { x := intstr.FromInt(i); return &x }(1),
|
||||||
}
|
}
|
||||||
|
newDeployment.Spec.MinReadySeconds = int32(10)
|
||||||
_, err = c.Extensions().Deployments(ns).Create(newDeployment)
|
_, err = c.Extensions().Deployments(ns).Create(newDeployment)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Verify that the pods were scaled up and down as expected.
|
// Verify that the pods were scaled up and down as expected.
|
||||||
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
|
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
framework.Logf("Make sure deployment %q performs scaling operations", deploymentName)
|
||||||
// Make sure the deployment starts to scale up and down replica sets by checking if its updated replicas >= 1
|
// Make sure the deployment starts to scale up and down replica sets by checking if its updated replicas >= 1
|
||||||
err = framework.WaitForDeploymentUpdatedReplicasLTE(c, ns, deploymentName, 1, deployment.Generation)
|
err = framework.WaitForDeploymentUpdatedReplicasLTE(c, ns, deploymentName, 1, deployment.Generation)
|
||||||
// Check if it's updated to revision 1 correctly
|
// Check if it's updated to revision 1 correctly
|
||||||
|
framework.Logf("Check revision of new replica set for deployment %q", deploymentName)
|
||||||
_, newRS := checkDeploymentRevision(c, ns, deploymentName, "1", deploymentImageName, deploymentImage)
|
_, newRS := checkDeploymentRevision(c, ns, deploymentName, "1", deploymentImageName, deploymentImage)
|
||||||
|
|
||||||
// Before the deployment finishes, update the deployment to rollover the above 2 ReplicaSets and bring up redis pods.
|
// Before the deployment finishes, update the deployment to rollover the above 2 ReplicaSets and bring up redis pods.
|
||||||
Expect(*newRS.Spec.Replicas).Should(BeNumerically("<", deploymentReplicas))
|
Expect(*newRS.Spec.Replicas).Should(BeNumerically("<", deploymentReplicas))
|
||||||
|
framework.Logf("Make sure deployment %q with new image", deploymentName)
|
||||||
updatedDeploymentImageName, updatedDeploymentImage := redisImageName, redisImage
|
updatedDeploymentImageName, updatedDeploymentImage := redisImageName, redisImage
|
||||||
deployment, err = framework.UpdateDeploymentWithRetries(c, ns, newDeployment.Name, func(update *extensions.Deployment) {
|
deployment, err = framework.UpdateDeploymentWithRetries(c, ns, newDeployment.Name, func(update *extensions.Deployment) {
|
||||||
update.Spec.Template.Spec.Containers[0].Name = updatedDeploymentImageName
|
update.Spec.Template.Spec.Containers[0].Name = updatedDeploymentImageName
|
||||||
@ -585,13 +589,16 @@ func testRolloverDeployment(f *framework.Framework) {
|
|||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Use observedGeneration to determine if the controller noticed the pod template update.
|
// Use observedGeneration to determine if the controller noticed the pod template update.
|
||||||
|
framework.Logf("Wait deployment %q to be observed by the deployment controller", deploymentName)
|
||||||
err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
|
err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
// Wait for it to be updated to revision 2
|
// Wait for it to be updated to revision 2
|
||||||
|
framework.Logf("Wait for revision update of deployment %q to 2", deploymentName)
|
||||||
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "2", updatedDeploymentImage)
|
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "2", updatedDeploymentImage)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
framework.Logf("Make sure deployment %q is complete", deploymentName)
|
||||||
err = framework.WaitForDeploymentStatus(c, deployment)
|
err = framework.WaitForDeploymentStatus(c, deployment)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user