diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 78a892c9348..b4e3218d92d 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -441,7 +441,7 @@ func (dc *DeploymentController) syncDeployment(key string) error { // Rolling back to a revision; no-op if the toRevision is deployment's current revision func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) { - newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment) + newRS, allOldRSs, err := dc.getAllReplicaSets(*deployment, true) if err != nil { return nil, err } @@ -493,7 +493,8 @@ func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *e } func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error { - newRS, oldRSs, err := dc.getAllReplicaSets(deployment) + // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down + newRS, oldRSs, err := dc.getAllReplicaSets(deployment, false) if err != nil { return err } @@ -509,6 +510,16 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Dep return dc.updateDeploymentStatus(allRSs, newRS, deployment) } + // If we need to create a new RS, create it now + // TODO: Create a new RS without re-listing all RSs. + if newRS == nil { + newRS, oldRSs, err = dc.getAllReplicaSets(deployment, true) + if err != nil { + return err + } + allRSs = append(oldRSs, newRS) + } + // scale up new replica set scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment) if err != nil { @@ -529,7 +540,7 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Dep } func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extensions.Deployment) error { - newRS, oldRSs, err := dc.getAllReplicaSets(deployment) + newRS, oldRSs, err := dc.getAllReplicaSets(deployment, true) if err != nil { return err } @@ -577,7 +588,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.Replic } // getAllReplicaSets returns all the replica sets for the provided deployment (new and all old). -func (dc *DeploymentController) getAllReplicaSets(deployment extensions.Deployment) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { +func (dc *DeploymentController) getAllReplicaSets(deployment extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { _, allOldRSs, err := dc.getOldReplicaSets(deployment) if err != nil { return nil, nil, err @@ -586,13 +597,13 @@ func (dc *DeploymentController) getAllReplicaSets(deployment extensions.Deployme maxOldV := maxRevision(allOldRSs) // Get new replica set with the updated revision number - newRS, err := dc.getNewReplicaSet(deployment, maxOldV) + newRS, err := dc.getNewReplicaSet(deployment, maxOldV, allOldRSs, createIfNotExisted) if err != nil { return nil, nil, err } // Sync deployment's revision number with new replica set - if newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 && + if newRS != nil && newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 && (deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) { if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil { glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err) @@ -648,7 +659,7 @@ func (dc *DeploymentController) getOldReplicaSets(deployment extensions.Deployme // Returns a replica set that matches the intent of the given deployment. // It creates a new replica set if required. // The revision of the new replica set will be updated to maxOldRevision + 1 -func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deployment, maxOldRevision int64) (*extensions.ReplicaSet, error) { +func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deployment, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) { // Calculate revision number for this new replica set newRevision := strconv.FormatInt(maxOldRevision+1, 10) @@ -665,6 +676,11 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen } return existingNewRS, nil } + + if !createIfNotExisted { + return nil, nil + } + // Check the replica set expectations of the deployment before creating a new one. dKey, err := controller.KeyFunc(&deployment) if err != nil { @@ -701,14 +717,22 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen } // Set new replica set's annotation setNewReplicaSetAnnotations(&deployment, &newRS, newRevision) + allRSs := append(oldRSs, &newRS) + newReplicasCount, err := deploymentutil.NewRSNewReplicas(&deployment, allRSs, &newRS) + if err != nil { + return nil, err + } + newRS.Spec.Replicas = newReplicasCount createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) if err != nil { dc.rsExpectations.DeleteExpectations(dKey) return nil, fmt.Errorf("error creating replica set: %v", err) } + if newReplicasCount > 0 { + dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount) + } - err = dc.updateDeploymentRevision(deployment, newRevision) - return createdRS, err + return createdRS, dc.updateDeploymentRevision(deployment, newRevision) } // setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and @@ -752,9 +776,12 @@ func (dc *DeploymentController) updateDeploymentRevision(deployment extensions.D if deployment.Annotations == nil { deployment.Annotations = make(map[string]string) } - deployment.Annotations[deploymentutil.RevisionAnnotation] = revision - _, err := dc.updateDeployment(&deployment) - return err + if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision { + deployment.Annotations[deploymentutil.RevisionAnnotation] = revision + _, err := dc.updateDeployment(&deployment) + return err + } + return nil } func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) { @@ -764,29 +791,15 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl } if newRS.Spec.Replicas > deployment.Spec.Replicas { // Scale down. - _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) - return true, err + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) + return scaled, err } - // Check if we can scale up. - maxSurge, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Replicas) + newReplicasCount, err := deploymentutil.NewRSNewReplicas(&deployment, allRSs, newRS) if err != nil { return false, err } - - // Find the total number of pods - currentPodCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - maxTotalPods := deployment.Spec.Replicas + maxSurge - if currentPodCount >= maxTotalPods { - // Cannot scale up. - return false, nil - } - // Scale up. - scaleUpCount := maxTotalPods - currentPodCount - // Do not exceed the number of desired replicas. - scaleUpCount = integer.IntMin(scaleUpCount, deployment.Spec.Replicas-newRS.Spec.Replicas) - newReplicasCount := newRS.Spec.Replicas + scaleUpCount - _, err = dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment) - return true, err + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment) + return scaled, err } // Set expectationsCheck to false to bypass expectations check when testing @@ -903,7 +916,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re scaledDownCount := integer.IntMin(maxCleanupCount-totalScaledDown, targetRS.Spec.Replicas-readyPodCount) newReplicasCount := targetRS.Spec.Replicas - scaledDownCount - _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) + _, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) if err != nil { return totalScaledDown, err } @@ -949,7 +962,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [ // Scale down. scaleDownCount := integer.IntMin(targetRS.Spec.Replicas, totalScaleDownCount-totalScaledDown) newReplicasCount := targetRS.Spec.Replicas - scaleDownCount - _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) + _, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) if err != nil { return totalScaledDown, err } @@ -968,23 +981,21 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext if rs.Spec.Replicas == 0 { continue } - _, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment) + scaledRS, _, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment) if err != nil { return false, err } - scaled = true + if scaledRS { + scaled = true + } } return scaled, nil } // scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate" func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) { - if newRS.Spec.Replicas == deployment.Spec.Replicas { - // Scaling not required. - return false, nil - } - _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) - return true, err + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) + return scaled, err } func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment extensions.Deployment) error { @@ -1042,7 +1053,11 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, return } -func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int, deployment extensions.Deployment) (*extensions.ReplicaSet, error) { +func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int, deployment extensions.Deployment) (bool, *extensions.ReplicaSet, error) { + // No need to scale + if rs.Spec.Replicas == newScale { + return false, rs, nil + } scalingOperation := "down" if rs.Spec.Replicas < newScale { scalingOperation = "up" @@ -1051,7 +1066,7 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep if err == nil { dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) } - return newRS, err + return true, newRS, err } func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int) (*extensions.ReplicaSet, error) { diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index b02dcab1986..d69c48b13f1 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -26,6 +26,8 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/integer" + intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" ) @@ -233,3 +235,38 @@ func Revision(rs *extensions.ReplicaSet) (int64, error) { } return strconv.ParseInt(v, 10, 64) } + +func IsRollingUpdate(deployment *extensions.Deployment) bool { + return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType +} + +// NewRSNewReplicas calculates the number of replicas a deployment's new RS should have. +// When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it. +// 1) The new RS is saturated: newRS's replicas == deployment's replicas +// 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas +func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) (int, error) { + switch deployment.Spec.Strategy.Type { + case extensions.RollingUpdateDeploymentStrategyType: + // Check if we can scale up. + maxSurge, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, deployment.Spec.Replicas) + if err != nil { + return 0, err + } + // Find the total number of pods + currentPodCount := GetReplicaCountForReplicaSets(allRSs) + maxTotalPods := deployment.Spec.Replicas + maxSurge + if currentPodCount >= maxTotalPods { + // Cannot scale up. + return newRS.Spec.Replicas, nil + } + // Scale up. + scaleUpCount := maxTotalPods - currentPodCount + // Do not exceed the number of desired replicas. + scaleUpCount = integer.IntMin(scaleUpCount, deployment.Spec.Replicas-newRS.Spec.Replicas) + return newRS.Spec.Replicas + scaleUpCount, nil + case extensions.RecreateDeploymentStrategyType: + return deployment.Spec.Replicas, nil + default: + return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type) + } +}