From 39576b47baad5f7ecd9b57a7a365693b880dc20d Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Mon, 20 Jun 2016 11:27:15 +0200 Subject: [PATCH] controller: wait for synced old replica sets on Recreate --- pkg/controller/deployment/recreate.go | 25 ++++++++++++++++++++++++- pkg/util/replicaset/replicaset.go | 22 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index ccd968348ca..385f5a2ad22 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -18,7 +18,10 @@ package deployment import ( "k8s.io/kubernetes/pkg/apis/extensions" + unversionedclient "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller" + rsutil "k8s.io/kubernetes/pkg/util/replicaset" + "k8s.io/kubernetes/pkg/util/wait" ) // rolloutRecreate implements the logic for recreating a replica set. @@ -29,9 +32,10 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen return err } allRSs := append(oldRSs, newRS) + activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) // scale down old replica sets - scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(controller.FilterActiveReplicaSets(oldRSs), deployment) + scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, deployment) if err != nil { return err } @@ -40,6 +44,11 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen return dc.updateDeploymentStatus(allRSs, newRS, deployment) } + // Wait for all old replica set to scale down to zero. + if err := dc.waitForInactiveReplicaSets(activeOldRSs); err != nil { + return err + } + // If we need to create a new RS, create it now // TODO: Create a new RS without re-listing all RSs. if newRS == nil { @@ -85,6 +94,20 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext return scaled, nil } +// waitForInactiveReplicaSets will wait until all passed replica sets are inactive and have been noticed +// by the replica set controller. +func (dc *DeploymentController) waitForInactiveReplicaSets(oldRSs []*extensions.ReplicaSet) error { + for i := range oldRSs { + rs := oldRSs[i] + + condition := rsutil.ReplicaSetIsInactive(dc.client.Extensions(), rs) + if err := wait.ExponentialBackoff(unversionedclient.DefaultRetry, condition); err != nil { + return err + } + } + return nil +} + // scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate" func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go index 206bba2dc15..0ac51203a5b 100644 --- a/pkg/util/replicaset/replicaset.go +++ b/pkg/util/replicaset/replicaset.go @@ -108,3 +108,25 @@ func MatchingPodsFunc(rs *extensions.ReplicaSet) (func(api.Pod) bool, error) { return selector.Matches(podLabelsSelector) }, nil } + +// ReplicaSetIsInactive returns a condition that will be true when a replica set is inactive ie. +// it has zero running replicas. +func ReplicaSetIsInactive(c unversionedextensions.ExtensionsInterface, replicaSet *extensions.ReplicaSet) wait.ConditionFunc { + + // If we're given a ReplicaSet where the status lags the spec, it either means that the + // ReplicaSet is stale, or that the ReplicaSet manager hasn't noticed the update yet. + // Polling status.Replicas is not safe in the latter case. + desiredGeneration := replicaSet.Generation + + return func() (bool, error) { + rs, err := c.ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name) + if err != nil { + return false, err + } + + return rs.Status.ObservedGeneration >= desiredGeneration && + rs.Spec.Replicas == 0 && + rs.Status.Replicas == 0 && + rs.Status.FullyLabeledReplicas == 0, nil + } +}