diff --git a/pkg/controller/deployment/rollback.go b/pkg/controller/deployment/rollback.go index aabfbc42ad8..a82db34dc22 100644 --- a/pkg/controller/deployment/rollback.go +++ b/pkg/controller/deployment/rollback.go @@ -23,7 +23,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" ) // Rolling back to a revision; no-op if the toRevision is deployment's current revision @@ -35,7 +35,7 @@ func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRe allRSs := append(allOldRSs, newRS) // If rollback revision is 0, rollback to the last revision if *toRevision == 0 { - if *toRevision = lastRevision(allRSs); *toRevision == 0 { + if *toRevision = deploymentutil.LastRevision(allRSs); *toRevision == 0 { // If we still can't find the last revision, gives up rollback dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") // Gives up rollback @@ -79,7 +79,7 @@ func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deploy // // If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit}, // and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct. - setDeploymentAnnotationsTo(deployment, rs) + deploymentutil.SetDeploymentAnnotationsTo(deployment, rs) performedRollback = true } else { glog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %s, skipping rollback...", deployment.Name) diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index 24997bb60bf..0b5e5af8c28 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -23,7 +23,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/controller" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/util/integer" ) @@ -94,7 +94,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep return false, fmt.Errorf("could not find available pods: %v", err) } glog.V(4).Infof("New RS %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRSAvailablePodCount) - maxUnavailable := maxUnavailable(*deployment) + maxUnavailable := deploymentutil.MaxUnavailable(*deployment) // Check if we can scale down. We can scale down in the following 2 cases: // * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further @@ -197,7 +197,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re // scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate". // Need check maxUnavailable to ensure availability func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int32, error) { - maxUnavailable := maxUnavailable(*deployment) + maxUnavailable := deploymentutil.MaxUnavailable(*deployment) // Check if we can scale down. minAvailable := deployment.Spec.Replicas - maxUnavailable diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index ff3b37828af..8ce7059a152 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -28,7 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/controller" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" utilerrors "k8s.io/kubernetes/pkg/util/errors" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" @@ -72,7 +72,7 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext } // Calculate the max revision number among all old RSes - maxOldV := maxRevision(allOldRSs) + maxOldV := deploymentutil.MaxRevision(allOldRSs) // Get new replica set with the updated revision number newRS, err := dc.getNewReplicaSet(deployment, rsList, maxOldV, allOldRSs, createIfNotExisted) @@ -232,7 +232,7 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme return nil, err } else if existingNewRS != nil { // Set existing new replica set's annotation - if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) { + if deploymentutil.SetNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) { return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS) } return existingNewRS, nil @@ -270,7 +270,7 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme newRS.Spec.Replicas = newReplicasCount // Set new replica set's annotation - setNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) + deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) if err != nil { dc.enqueueDeployment(deployment) @@ -303,7 +303,7 @@ func (dc *DeploymentController) updateDeploymentRevision(deployment *extensions. func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error { // If there is only one active replica set then we should scale that up to the full count of the // deployment. If there is no active replica set, then we should scale up the newest replica set. - if activeOrLatest := findActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { + if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas { return nil } @@ -331,7 +331,7 @@ func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS * allowedSize := int32(0) if deployment.Spec.Replicas > 0 { - allowedSize = deployment.Spec.Replicas + maxSurge(*deployment) + allowedSize = deployment.Spec.Replicas + deploymentutil.MaxSurge(*deployment) } // Number of additional replicas that can be either added or removed from the total @@ -365,7 +365,7 @@ func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS * for i := range allRSs { rs := allRSs[i] - proportion := getProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) + proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) rs.Spec.Replicas += proportion deploymentReplicasAdded += proportion @@ -411,7 +411,7 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) { // NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea. rs.Spec.Replicas = newScale - setReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) + deploymentutil.SetReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment)) rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) if err == nil { dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) @@ -515,7 +515,7 @@ func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) bool { } allRSs := append(oldRSs, newRS) for _, rs := range controller.FilterActiveReplicaSets(allRSs) { - desired, ok := getDesiredReplicasAnnotation(rs) + desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs) if !ok { continue } diff --git a/pkg/controller/deployment/sync_test.go b/pkg/controller/deployment/sync_test.go index d08cf3d0777..97a7ccb4825 100644 --- a/pkg/controller/deployment/sync_test.go +++ b/pkg/controller/deployment/sync_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/util/intstr" ) @@ -237,7 +238,7 @@ func TestScale(t *testing.T) { if desired, ok := test.desiredReplicasAnnotations[test.newRS.Name]; ok { desiredReplicas = desired } - setReplicasAnnotations(test.newRS, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) + deploymentutil.SetReplicasAnnotations(test.newRS, desiredReplicas, desiredReplicas+deploymentutil.MaxSurge(*test.oldDeployment)) } for i := range test.oldRSs { rs := test.oldRSs[i] @@ -248,7 +249,7 @@ func TestScale(t *testing.T) { if desired, ok := test.desiredReplicasAnnotations[rs.Name]; ok { desiredReplicas = desired } - setReplicasAnnotations(rs, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) + deploymentutil.SetReplicasAnnotations(rs, desiredReplicas, desiredReplicas+deploymentutil.MaxSurge(*test.oldDeployment)) } if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil { diff --git a/pkg/controller/deployment/util.go b/pkg/controller/deployment/util.go deleted file mode 100644 index 4070e821790..00000000000 --- a/pkg/controller/deployment/util.go +++ /dev/null @@ -1,272 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package deployment - -import ( - "fmt" - "sort" - "strconv" - - "github.com/golang/glog" - - "k8s.io/kubernetes/pkg/api/annotations" - "k8s.io/kubernetes/pkg/apis/extensions" - "k8s.io/kubernetes/pkg/controller" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" - "k8s.io/kubernetes/pkg/util/integer" -) - -func maxRevision(allRSs []*extensions.ReplicaSet) int64 { - max := int64(0) - for _, rs := range allRSs { - if v, err := deploymentutil.Revision(rs); err != nil { - // Skip the replica sets when it failed to parse their revision information - glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) - } else if v > max { - max = v - } - } - return max -} - -// lastRevision finds the second max revision number in all replica sets (the last revision) -func lastRevision(allRSs []*extensions.ReplicaSet) int64 { - max, secMax := int64(0), int64(0) - for _, rs := range allRSs { - if v, err := deploymentutil.Revision(rs); err != nil { - // Skip the replica sets when it failed to parse their revision information - glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) - } else if v >= max { - secMax = max - max = v - } else if v > secMax { - secMax = v - } - } - return secMax -} - -// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and -// copying required deployment annotations to it; it returns true if replica set's annotation is changed. -func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool { - // First, copy deployment's annotations (except for apply and revision annotations) - annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) - // Then, update replica set's revision annotation - if newRS.Annotations == nil { - newRS.Annotations = make(map[string]string) - } - // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number - // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and - // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision. - if newRS.Annotations[deploymentutil.RevisionAnnotation] < newRevision { - newRS.Annotations[deploymentutil.RevisionAnnotation] = newRevision - annotationChanged = true - glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) - } - if !exists && setReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) { - annotationChanged = true - } - return annotationChanged -} - -var annotationsToSkip = map[string]bool{ - annotations.LastAppliedConfigAnnotation: true, - deploymentutil.RevisionAnnotation: true, - deploymentutil.DesiredReplicasAnnotation: true, - deploymentutil.MaxReplicasAnnotation: true, -} - -// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key -// TODO: How to decide which annotations should / should not be copied? -// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 -func skipCopyAnnotation(key string) bool { - return annotationsToSkip[key] -} - -// copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations, -// and returns true if replica set's annotation is changed. -// Note that apply and revision annotations are not copied. -func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool { - rsAnnotationsChanged := false - if rs.Annotations == nil { - rs.Annotations = make(map[string]string) - } - for k, v := range deployment.Annotations { - // newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated - // by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of - // deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable. - if skipCopyAnnotation(k) || rs.Annotations[k] == v { - continue - } - rs.Annotations[k] = v - rsAnnotationsChanged = true - } - return rsAnnotationsChanged -} - -// setDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations. -// This action should be done if and only if the deployment is rolling back to this rs. -// Note that apply and revision annotations are not changed. -func setDeploymentAnnotationsTo(deployment *extensions.Deployment, rollbackToRS *extensions.ReplicaSet) { - deployment.Annotations = getSkippedAnnotations(deployment.Annotations) - for k, v := range rollbackToRS.Annotations { - if !skipCopyAnnotation(k) { - deployment.Annotations[k] = v - } - } -} - -func getSkippedAnnotations(annotations map[string]string) map[string]string { - skippedAnnotations := make(map[string]string) - for k, v := range annotations { - if skipCopyAnnotation(k) { - skippedAnnotations[k] = v - } - } - return skippedAnnotations -} - -// findActiveOrLatest returns the only active or the latest replica set in case there is at most one active -// replica set. If there are more active replica sets, then we should proportionally scale them. -func findActiveOrLatest(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) *extensions.ReplicaSet { - if newRS == nil && len(oldRSs) == 0 { - return nil - } - - sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs))) - allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) - - switch len(allRSs) { - case 0: - // If there is no active replica set then we should return the newest. - if newRS != nil { - return newRS - } - return oldRSs[0] - case 1: - return allRSs[0] - default: - return nil - } -} - -func getDesiredReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) { - return getIntFromAnnotation(rs, deploymentutil.DesiredReplicasAnnotation) -} - -func getMaxReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) { - return getIntFromAnnotation(rs, deploymentutil.MaxReplicasAnnotation) -} - -func getIntFromAnnotation(rs *extensions.ReplicaSet, annotationKey string) (int32, bool) { - annotationValue, ok := rs.Annotations[annotationKey] - if !ok { - return int32(0), false - } - intValue, err := strconv.Atoi(annotationValue) - if err != nil { - glog.Warningf("Cannot convert the value %q with annotation key %q for the replica set %q", - annotationValue, annotationKey, rs.Name) - return int32(0), false - } - return int32(intValue), true -} - -func setReplicasAnnotations(rs *extensions.ReplicaSet, desiredReplicas, maxReplicas int32) bool { - updated := false - if rs.Annotations == nil { - rs.Annotations = make(map[string]string) - } - desiredString := fmt.Sprintf("%d", desiredReplicas) - if hasString := rs.Annotations[deploymentutil.DesiredReplicasAnnotation]; hasString != desiredString { - rs.Annotations[deploymentutil.DesiredReplicasAnnotation] = desiredString - updated = true - } - maxString := fmt.Sprintf("%d", maxReplicas) - if hasString := rs.Annotations[deploymentutil.MaxReplicasAnnotation]; hasString != maxString { - rs.Annotations[deploymentutil.MaxReplicasAnnotation] = maxString - updated = true - } - return updated -} - -// maxUnavailable returns the maximum unavailable pods a rolling deployment can take. -func maxUnavailable(deployment extensions.Deployment) int32 { - if !deploymentutil.IsRollingUpdate(&deployment) { - return int32(0) - } - // Error caught by validation - _, maxUnavailable, _ := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas) - return maxUnavailable -} - -// maxSurge returns the maximum surge pods a rolling deployment can take. -func maxSurge(deployment extensions.Deployment) int32 { - if !deploymentutil.IsRollingUpdate(&deployment) { - return int32(0) - } - // Error caught by validation - maxSurge, _, _ := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas) - return maxSurge -} - -// getProportion will estimate the proportion for the provided replica set using 1. the current size -// of the parent deployment, 2. the replica count that needs be added on the replica sets of the -// deployment, and 3. the total replicas added in the replica sets of the deployment so far. -func getProportion(rs *extensions.ReplicaSet, d extensions.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 { - if rs == nil || rs.Spec.Replicas == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded { - return int32(0) - } - - rsFraction := getReplicaSetFraction(*rs, d) - allowed := deploymentReplicasToAdd - deploymentReplicasAdded - - if deploymentReplicasToAdd > 0 { - // Use the minimum between the replica set fraction and the maximum allowed replicas - // when scaling up. This way we ensure we will not scale up more than the allowed - // replicas we can add. - return integer.Int32Min(rsFraction, allowed) - } - // Use the maximum between the replica set fraction and the maximum allowed replicas - // when scaling down. This way we ensure we will not scale down more than the allowed - // replicas we can remove. - return integer.Int32Max(rsFraction, allowed) -} - -// getReplicaSetFraction estimates the fraction of replicas a replica set can have in -// 1. a scaling event during a rollout or 2. when scaling a paused deployment. -func getReplicaSetFraction(rs extensions.ReplicaSet, d extensions.Deployment) int32 { - // If we are scaling down to zero then the fraction of this replica set is its whole size (negative) - if d.Spec.Replicas == int32(0) { - return -rs.Spec.Replicas - } - - deploymentReplicas := d.Spec.Replicas + maxSurge(d) - annotatedReplicas, ok := getMaxReplicasAnnotation(&rs) - if !ok { - // If we cannot find the annotation then fallback to the current deployment size. Note that this - // will not be an accurate proportion estimation in case other replica sets have different values - // which means that the deployment was scaled at some point but we at least will stay in limits - // due to the min-max comparisons in getProportion. - annotatedReplicas = d.Status.Replicas - } - - // We should never proportionally scale up from zero which means rs.spec.replicas and annotatedReplicas - // will never be zero here. - newRSsize := (float64(rs.Spec.Replicas * deploymentReplicas)) / float64(annotatedReplicas) - return integer.RoundToInt32(newRSsize) - rs.Spec.Replicas -} diff --git a/pkg/util/deployment/deployment.go b/pkg/controller/deployment/util/deployment_util.go similarity index 65% rename from pkg/util/deployment/deployment.go rename to pkg/controller/deployment/util/deployment_util.go index 732361b871d..e99a55e6682 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,16 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package deployment +package util import ( "fmt" + "sort" "strconv" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/annotations" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -39,7 +41,7 @@ import ( ) const ( - // The revision annotation of a deployment's replica sets which records its rollout sequence + // RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence RevisionAnnotation = "deployment.kubernetes.io/revision" // DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation // in its replica sets. Helps in separating scaling events from the rollout process and for @@ -50,12 +52,258 @@ const ( // proportions in case the deployment has surge replicas. MaxReplicasAnnotation = "deployment.kubernetes.io/max-replicas" - // Here are the possible rollback event reasons - RollbackRevisionNotFound = "DeploymentRollbackRevisionNotFound" + // RollbackRevisionNotFound is not found rollback event reason + RollbackRevisionNotFound = "DeploymentRollbackRevisionNotFound" + // RollbackTemplateUnchanged is the template unchanged rollback event reason RollbackTemplateUnchanged = "DeploymentRollbackTemplateUnchanged" - RollbackDone = "DeploymentRollback" + // RollbackDone is the done rollback event reason + RollbackDone = "DeploymentRollback" ) +// MaxRevision finds the highest revision in the replica sets +func MaxRevision(allRSs []*extensions.ReplicaSet) int64 { + max := int64(0) + for _, rs := range allRSs { + if v, err := Revision(rs); err != nil { + // Skip the replica sets when it failed to parse their revision information + glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) + } else if v > max { + max = v + } + } + return max +} + +// LastRevision finds the second max revision number in all replica sets (the last revision) +func LastRevision(allRSs []*extensions.ReplicaSet) int64 { + max, secMax := int64(0), int64(0) + for _, rs := range allRSs { + if v, err := Revision(rs); err != nil { + // Skip the replica sets when it failed to parse their revision information + glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) + } else if v >= max { + secMax = max + max = v + } else if v > secMax { + secMax = v + } + } + return secMax +} + +// SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and +// copying required deployment annotations to it; it returns true if replica set's annotation is changed. +func SetNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool { + // First, copy deployment's annotations (except for apply and revision annotations) + annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) + // Then, update replica set's revision annotation + if newRS.Annotations == nil { + newRS.Annotations = make(map[string]string) + } + // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number + // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and + // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision. + if newRS.Annotations[RevisionAnnotation] < newRevision { + newRS.Annotations[RevisionAnnotation] = newRevision + annotationChanged = true + glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) + } + if !exists && SetReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+MaxSurge(*deployment)) { + annotationChanged = true + } + return annotationChanged +} + +var annotationsToSkip = map[string]bool{ + annotations.LastAppliedConfigAnnotation: true, + RevisionAnnotation: true, + DesiredReplicasAnnotation: true, + MaxReplicasAnnotation: true, +} + +// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key +// TODO: How to decide which annotations should / should not be copied? +// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 +func skipCopyAnnotation(key string) bool { + return annotationsToSkip[key] +} + +// copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations, +// and returns true if replica set's annotation is changed. +// Note that apply and revision annotations are not copied. +func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool { + rsAnnotationsChanged := false + if rs.Annotations == nil { + rs.Annotations = make(map[string]string) + } + for k, v := range deployment.Annotations { + // newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated + // by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of + // deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable. + if skipCopyAnnotation(k) || rs.Annotations[k] == v { + continue + } + rs.Annotations[k] = v + rsAnnotationsChanged = true + } + return rsAnnotationsChanged +} + +// SetDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations. +// This action should be done if and only if the deployment is rolling back to this rs. +// Note that apply and revision annotations are not changed. +func SetDeploymentAnnotationsTo(deployment *extensions.Deployment, rollbackToRS *extensions.ReplicaSet) { + deployment.Annotations = getSkippedAnnotations(deployment.Annotations) + for k, v := range rollbackToRS.Annotations { + if !skipCopyAnnotation(k) { + deployment.Annotations[k] = v + } + } +} + +func getSkippedAnnotations(annotations map[string]string) map[string]string { + skippedAnnotations := make(map[string]string) + for k, v := range annotations { + if skipCopyAnnotation(k) { + skippedAnnotations[k] = v + } + } + return skippedAnnotations +} + +// FindActiveOrLatest returns the only active or the latest replica set in case there is at most one active +// replica set. If there are more active replica sets, then we should proportionally scale them. +func FindActiveOrLatest(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) *extensions.ReplicaSet { + if newRS == nil && len(oldRSs) == 0 { + return nil + } + + sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs))) + allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) + + switch len(allRSs) { + case 0: + // If there is no active replica set then we should return the newest. + if newRS != nil { + return newRS + } + return oldRSs[0] + case 1: + return allRSs[0] + default: + return nil + } +} + +// GetDesiredReplicasAnnotation returns the number of desired replicas +func GetDesiredReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) { + return getIntFromAnnotation(rs, DesiredReplicasAnnotation) +} + +func getMaxReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) { + return getIntFromAnnotation(rs, MaxReplicasAnnotation) +} + +func getIntFromAnnotation(rs *extensions.ReplicaSet, annotationKey string) (int32, bool) { + annotationValue, ok := rs.Annotations[annotationKey] + if !ok { + return int32(0), false + } + intValue, err := strconv.Atoi(annotationValue) + if err != nil { + glog.Warningf("Cannot convert the value %q with annotation key %q for the replica set %q", + annotationValue, annotationKey, rs.Name) + return int32(0), false + } + return int32(intValue), true +} + +// SetReplicasAnnotations sets the desiredReplicas and maxReplicas into the annotations +func SetReplicasAnnotations(rs *extensions.ReplicaSet, desiredReplicas, maxReplicas int32) bool { + updated := false + if rs.Annotations == nil { + rs.Annotations = make(map[string]string) + } + desiredString := fmt.Sprintf("%d", desiredReplicas) + if hasString := rs.Annotations[DesiredReplicasAnnotation]; hasString != desiredString { + rs.Annotations[DesiredReplicasAnnotation] = desiredString + updated = true + } + maxString := fmt.Sprintf("%d", maxReplicas) + if hasString := rs.Annotations[MaxReplicasAnnotation]; hasString != maxString { + rs.Annotations[MaxReplicasAnnotation] = maxString + updated = true + } + return updated +} + +// MaxUnavailable returns the maximum unavailable pods a rolling deployment can take. +func MaxUnavailable(deployment extensions.Deployment) int32 { + if !IsRollingUpdate(&deployment) { + return int32(0) + } + // Error caught by validation + _, maxUnavailable, _ := ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas) + return maxUnavailable +} + +// MaxSurge returns the maximum surge pods a rolling deployment can take. +func MaxSurge(deployment extensions.Deployment) int32 { + if !IsRollingUpdate(&deployment) { + return int32(0) + } + // Error caught by validation + maxSurge, _, _ := ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas) + return maxSurge +} + +// GetProportion will estimate the proportion for the provided replica set using 1. the current size +// of the parent deployment, 2. the replica count that needs be added on the replica sets of the +// deployment, and 3. the total replicas added in the replica sets of the deployment so far. +func GetProportion(rs *extensions.ReplicaSet, d extensions.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 { + if rs == nil || rs.Spec.Replicas == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded { + return int32(0) + } + + rsFraction := getReplicaSetFraction(*rs, d) + allowed := deploymentReplicasToAdd - deploymentReplicasAdded + + if deploymentReplicasToAdd > 0 { + // Use the minimum between the replica set fraction and the maximum allowed replicas + // when scaling up. This way we ensure we will not scale up more than the allowed + // replicas we can add. + return integer.Int32Min(rsFraction, allowed) + } + // Use the maximum between the replica set fraction and the maximum allowed replicas + // when scaling down. This way we ensure we will not scale down more than the allowed + // replicas we can remove. + return integer.Int32Max(rsFraction, allowed) +} + +// getReplicaSetFraction estimates the fraction of replicas a replica set can have in +// 1. a scaling event during a rollout or 2. when scaling a paused deployment. +func getReplicaSetFraction(rs extensions.ReplicaSet, d extensions.Deployment) int32 { + // If we are scaling down to zero then the fraction of this replica set is its whole size (negative) + if d.Spec.Replicas == int32(0) { + return -rs.Spec.Replicas + } + + deploymentReplicas := d.Spec.Replicas + MaxSurge(d) + annotatedReplicas, ok := getMaxReplicasAnnotation(&rs) + if !ok { + // If we cannot find the annotation then fallback to the current deployment size. Note that this + // will not be an accurate proportion estimation in case other replica sets have different values + // which means that the deployment was scaled at some point but we at least will stay in limits + // due to the min-max comparisons in getProportion. + annotatedReplicas = d.Status.Replicas + } + + // We should never proportionally scale up from zero which means rs.spec.replicas and annotatedReplicas + // will never be zero here. + newRSsize := (float64(rs.Spec.Replicas * deploymentReplicas)) / float64(annotatedReplicas) + return integer.RoundToInt32(newRSsize) - rs.Spec.Replicas +} + // GetAllReplicaSets returns the old and new replica sets targeted by the given Deployment. It gets PodList and ReplicaSetList from client interface. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. // The third returned value is the new replica set, and it may be nil if it doesn't exist yet. @@ -227,6 +475,7 @@ func FindOldReplicaSets(deployment *extensions.Deployment, rsList []extensions.R return requiredRSs, allRSs, nil } +// WaitForReplicaSetUpdated polls the replica set until it is updated. func WaitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { return wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { rs, err := c.Extensions().ReplicaSets(namespace).Get(name) @@ -237,6 +486,7 @@ func WaitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, na }) } +// WaitForPodsHashPopulated polls the replica set until updated and fully labeled. func WaitForPodsHashPopulated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { return wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) { rs, err := c.Extensions().ReplicaSets(namespace).Get(name) @@ -277,7 +527,7 @@ func LabelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c client return allPodsLabeled, nil } -// Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet. +// GetNewReplicaSetTemplate returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet. func GetNewReplicaSetTemplate(deployment *extensions.Deployment) api.PodTemplateSpec { // newRS will have the same template as in deployment spec, plus a unique label in some cases. newRSTemplate := api.PodTemplateSpec{ @@ -301,7 +551,7 @@ func SetFromReplicaSetTemplate(deployment *extensions.Deployment, template api.P return deployment } -// Returns the sum of Replicas of the given replica sets. +// GetReplicaCountForReplicaSets returns the sum of Replicas of the given replica sets. func GetReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) int32 { totalReplicaCount := int32(0) for _, rs := range replicaSets { @@ -363,6 +613,7 @@ func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 { return availablePodCount } +// IsPodAvailable return true if the pod is available. func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool { if !controller.IsPodActive(*pod) { return false @@ -409,6 +660,7 @@ func Revision(rs *extensions.ReplicaSet) (int64, error) { return strconv.ParseInt(v, 10, 64) } +// IsRollingUpdate returns true if the strategy type is a rolling update. func IsRollingUpdate(deployment *extensions.Deployment) bool { return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType } @@ -459,7 +711,7 @@ func IsSaturated(deployment *extensions.Deployment, rs *extensions.ReplicaSet) b return rs.Spec.Replicas == deployment.Spec.Replicas && int32(desired) == deployment.Spec.Replicas } -// Polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration. +// WaitForObservedDeployment polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration. // Returns error if polling timesout. func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error { // TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface. diff --git a/pkg/util/deployment/deployment_test.go b/pkg/controller/deployment/util/deployment_util_test.go similarity index 99% rename from pkg/util/deployment/deployment_test.go rename to pkg/controller/deployment/util/deployment_util_test.go index 6dde0377aa1..262631da8a3 100644 --- a/pkg/util/deployment/deployment_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package deployment +package util import ( "fmt" diff --git a/pkg/kubectl/describe.go b/pkg/kubectl/describe.go index 702049fcafa..f67e949c8a5 100644 --- a/pkg/kubectl/describe.go +++ b/pkg/kubectl/describe.go @@ -42,12 +42,12 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" client "k8s.io/kubernetes/pkg/client/unversioned" adapter "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/fieldpath" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/types" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/sets" ) diff --git a/pkg/kubectl/history.go b/pkg/kubectl/history.go index 65888d8524d..067eb2f4300 100644 --- a/pkg/kubectl/history.go +++ b/pkg/kubectl/history.go @@ -25,8 +25,8 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/runtime" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" sliceutil "k8s.io/kubernetes/pkg/util/slice" ) diff --git a/pkg/kubectl/rollback.go b/pkg/kubectl/rollback.go index afcb3ace780..f5294b04248 100644 --- a/pkg/kubectl/rollback.go +++ b/pkg/kubectl/rollback.go @@ -26,8 +26,8 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/runtime" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" "k8s.io/kubernetes/pkg/watch" ) diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index 45082e7a070..2872b1ef6b2 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -28,9 +28,9 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util/deployment" "k8s.io/kubernetes/pkg/util/integer" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/wait" @@ -206,7 +206,7 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { } // maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods // that can be unavailable during a rollout. - maxSurge, maxUnavailable, err := deployment.ResolveFenceposts(&config.MaxSurge, &config.MaxUnavailable, desired) + maxSurge, maxUnavailable, err := deploymentutil.ResolveFenceposts(&config.MaxSurge, &config.MaxUnavailable, desired) if err != nil { return err } @@ -420,7 +420,7 @@ func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController, minR return 0, 0, err } for _, pod := range pods.Items { - if !deployment.IsPodAvailable(&pod, minReadySeconds, r.nowFn().Time) { + if !deploymentutil.IsPodAvailable(&pod, minReadySeconds, r.nowFn().Time) { continue } switch controller.Name { diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index ea8ed5155bc..1d3a891bb7d 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -28,9 +28,9 @@ import ( "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/wait" ) diff --git a/pkg/kubectl/stop_test.go b/pkg/kubectl/stop_test.go index 1964b81b0af..05b5955b94f 100644 --- a/pkg/kubectl/stop_test.go +++ b/pkg/kubectl/stop_test.go @@ -29,8 +29,8 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/runtime" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" ) func TestReplicationControllerStop(t *testing.T) { diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 09136c79d0b..1d4550c4b47 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -30,10 +30,10 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" client "k8s.io/kubernetes/pkg/client/unversioned" adapter "k8s.io/kubernetes/pkg/client/unversioned/adapters/internalclientset" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 30019f354a8..2310cb85240 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubelet/util/format" @@ -64,7 +65,6 @@ import ( sshutil "k8s.io/kubernetes/pkg/ssh" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" labelsutil "k8s.io/kubernetes/pkg/util/labels" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait"