From 332d151d615ca8dba838d98f0624fb0d88bf98e1 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Sat, 25 Jun 2016 11:31:32 +0200 Subject: [PATCH] Break deployment controller into separate self-contained files * rolling.go (has all the logic for rolling deployments) * recreate.go (has all the logic for recreate deployments) * sync.go (has all the logic for getting and scaling replica sets) * rollback.go (has all the logic for rolling back a deployment) * util.go (contains all the utilities used throughout the controller) Leave back at deployment_controller.go all the necessary bits for creating, setting up, and running the controller loop. Also add package documentation. --- .../deployment/deployment_controller.go | 980 +----------------- .../deployment/deployment_controller_test.go | 800 -------------- pkg/controller/deployment/recreate.go | 92 ++ pkg/controller/deployment/rollback.go | 105 ++ pkg/controller/deployment/rolling.go | 243 +++++ pkg/controller/deployment/rolling_test.go | 505 +++++++++ pkg/controller/deployment/sync.go | 527 ++++++++++ pkg/controller/deployment/sync_test.go | 348 +++++++ pkg/controller/deployment/util.go | 111 ++ 9 files changed, 1935 insertions(+), 1776 deletions(-) create mode 100644 pkg/controller/deployment/recreate.go create mode 100644 pkg/controller/deployment/rollback.go create mode 100644 pkg/controller/deployment/rolling.go create mode 100644 pkg/controller/deployment/rolling_test.go create mode 100644 pkg/controller/deployment/sync.go create mode 100644 pkg/controller/deployment/sync_test.go diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 9e937b2b9d9..25786fc175a 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -14,19 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package deployment contains all the logic for handling Kubernetes Deployments. +// It implements a set of strategies (rolling, recreate) for deploying an application, +// the means to rollback to previous versions, proportional scaling for mitigating +// risk, cleanup policy, and other useful features of Deployments. package deployment import ( "fmt" "reflect" - "sort" - "strconv" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/annotations" - "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" @@ -36,13 +36,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" - deploymentutil "k8s.io/kubernetes/pkg/util/deployment" - utilerrors "k8s.io/kubernetes/pkg/util/errors" - "k8s.io/kubernetes/pkg/util/integer" - labelsutil "k8s.io/kubernetes/pkg/util/labels" "k8s.io/kubernetes/pkg/util/metrics" - podutil "k8s.io/kubernetes/pkg/util/pod" - rsutil "k8s.io/kubernetes/pkg/util/replicaset" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -460,969 +454,3 @@ func (dc *DeploymentController) syncDeployment(key string) error { } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) } - -// sync is responsible for reconciling deployments on scaling events or when they -// are paused. -func (dc *DeploymentController) sync(deployment *extensions.Deployment) error { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) - if err != nil { - return err - } - if err := dc.scale(deployment, newRS, oldRSs); err != nil { - // If we get an error while trying to scale, the deployment will be requeued - // so we can abort this resync - return err - } - dc.cleanupDeployment(oldRSs, deployment) - - allRSs := append(oldRSs, newRS) - return dc.syncDeploymentStatus(allRSs, newRS, deployment) -} - -// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size -// of the new replica set and scaling down can decrease the sizes of the old ones, both of which would -// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable -// replicas in the event of a problem with the rolled out template. Should run only on scaling events or -// when a deployment is paused and not during the normal rollout process. -func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error { - // If there is only one active replica set then we should scale that up to the full count of the - // deployment. If there is no active replica set, then we should scale up the newest replica set. - if activeOrLatest := findActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { - if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas { - return nil - } - _, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, deployment.Spec.Replicas, deployment) - return err - } - - // If the new replica set is saturated, old replica sets should be fully scaled down. - // This case handles replica set adoption during a saturated new replica set. - if deploymentutil.IsSaturated(deployment, newRS) { - for _, old := range controller.FilterActiveReplicaSets(oldRSs) { - if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil { - return err - } - } - return nil - } - - // There are old replica sets with pods and the new replica set is not saturated. - // We need to proportionally scale all replica sets (new and old) in case of a - // rolling deployment. - if deploymentutil.IsRollingUpdate(deployment) { - allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) - allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - - allowedSize := int32(0) - if deployment.Spec.Replicas > 0 { - allowedSize = deployment.Spec.Replicas + maxSurge(*deployment) - } - - // Number of additional replicas that can be either added or removed from the total - // replicas count. These replicas should be distributed proportionally to the active - // replica sets. - deploymentReplicasToAdd := allowedSize - allRSsReplicas - - // The additional replicas should be distributed proportionally amongst the active - // replica sets from the larger to the smaller in size replica set. Scaling direction - // drives what happens in case we are trying to scale replica sets of the same size. - // In such a case when scaling up, we should scale up newer replica sets first, and - // when scaling down, we should scale down older replica sets first. - scalingOperation := "up" - switch { - case deploymentReplicasToAdd > 0: - sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs)) - - case deploymentReplicasToAdd < 0: - sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs)) - scalingOperation = "down" - - default: /* deploymentReplicasToAdd == 0 */ - // Nothing to add. - return nil - } - - // Iterate over all active replica sets and estimate proportions for each of them. - // The absolute value of deploymentReplicasAdded should never exceed the absolute - // value of deploymentReplicasToAdd. - deploymentReplicasAdded := int32(0) - for i := range allRSs { - rs := allRSs[i] - - proportion := getProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) - - rs.Spec.Replicas += proportion - deploymentReplicasAdded += proportion - } - - // Update all replica sets - for i := range allRSs { - rs := allRSs[i] - - // Add/remove any leftovers to the largest replica set. - if i == 0 { - leftover := deploymentReplicasToAdd - deploymentReplicasAdded - rs.Spec.Replicas += leftover - if rs.Spec.Replicas < 0 { - rs.Spec.Replicas = 0 - } - } - - if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil { - // Return as soon as we fail, the deployment is requeued - return err - } - } - } - return nil -} - -// Rolling back to a revision; no-op if the toRevision is deployment's current revision -func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) { - newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) - if err != nil { - return nil, err - } - allRSs := append(allOldRSs, newRS) - // If rollback revision is 0, rollback to the last revision - if *toRevision == 0 { - if *toRevision = lastRevision(allRSs); *toRevision == 0 { - // If we still can't find the last revision, gives up rollback - dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") - // Gives up rollback - return dc.updateDeploymentAndClearRollbackTo(deployment) - } - } - for _, rs := range allRSs { - v, err := deploymentutil.Revision(rs) - if err != nil { - glog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err) - continue - } - if v == *toRevision { - glog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v) - // rollback by copying podTemplate.Spec from the replica set, and increment revision number by 1 - // no-op if the the spec matches current deployment's podTemplate.Spec - deployment, performedRollback, err := dc.rollbackToTemplate(deployment, rs) - if performedRollback && err == nil { - dc.emitRollbackNormalEvent(deployment, fmt.Sprintf("Rolled back deployment %q to revision %d", deployment.Name, *toRevision)) - } - return deployment, err - } - } - dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.") - // Gives up rollback - return dc.updateDeploymentAndClearRollbackTo(deployment) -} - -func (dc *DeploymentController) emitRollbackWarningEvent(deployment *extensions.Deployment, reason, message string) { - dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, reason, message) -} - -func (dc *DeploymentController) emitRollbackNormalEvent(deployment *extensions.Deployment, message string) { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, deploymentutil.RollbackDone, message) -} - -// updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment -func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *extensions.Deployment) (*extensions.Deployment, error) { - glog.V(4).Infof("Cleans up rollbackTo of deployment %s", deployment.Name) - deployment.Spec.RollbackTo = nil - return dc.updateDeployment(deployment) -} - -func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error { - // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) - if err != nil { - return err - } - allRSs := append(oldRSs, newRS) - - // scale down old replica sets - scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(controller.FilterActiveReplicaSets(oldRSs), deployment) - if err != nil { - return err - } - if scaledDown { - // Update DeploymentStatus - return dc.updateDeploymentStatus(allRSs, newRS, deployment) - } - - // If we need to create a new RS, create it now - // TODO: Create a new RS without re-listing all RSs. - if newRS == nil { - newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(deployment, true) - if err != nil { - return err - } - allRSs = append(oldRSs, newRS) - } - - // scale up new replica set - scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment) - if err != nil { - return err - } - if scaledUp { - // Update DeploymentStatus - return dc.updateDeploymentStatus(allRSs, newRS, deployment) - } - - dc.cleanupDeployment(oldRSs, deployment) - - // Sync deployment status - return dc.syncDeploymentStatus(allRSs, newRS, deployment) -} - -func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) - if err != nil { - return err - } - allRSs := append(oldRSs, newRS) - - // Scale up, if we can. - scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment) - if err != nil { - return err - } - if scaledUp { - // Update DeploymentStatus - return dc.updateDeploymentStatus(allRSs, newRS, deployment) - } - - // Scale down, if we can. - scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment) - if err != nil { - return err - } - if scaledDown { - // Update DeploymentStatus - return dc.updateDeploymentStatus(allRSs, newRS, deployment) - } - - dc.cleanupDeployment(oldRSs, deployment) - - // Sync deployment status - return dc.syncDeploymentStatus(allRSs, newRS, deployment) -} - -// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary -func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { - newStatus, err := dc.calculateStatus(allRSs, newRS, d) - if err != nil { - return err - } - if !reflect.DeepEqual(d.Status, newStatus) { - return dc.updateDeploymentStatus(allRSs, newRS, d) - } - return nil -} - -// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated. -// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV). -// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1), -// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop. -// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop. -// Note that currently the deployment controller is using caches to avoid querying the server for reads. -// This may lead to stale reads of replica sets, thus incorrect deployment status. -func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { - // List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods - rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment) - if err != nil { - return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err) - } - _, allOldRSs, err := deploymentutil.FindOldReplicaSets(deployment, rsList, podList) - if err != nil { - return nil, nil, err - } - - // Calculate the max revision number among all old RSes - maxOldV := maxRevision(allOldRSs) - - // Get new replica set with the updated revision number - newRS, err := dc.getNewReplicaSet(deployment, rsList, maxOldV, allOldRSs, createIfNotExisted) - if err != nil { - return nil, nil, err - } - - // Sync deployment's revision number with new replica set - if newRS != nil && newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 && - (deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) { - if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil { - glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err) - } - } - - return newRS, allOldRSs, nil -} - -func maxRevision(allRSs []*extensions.ReplicaSet) int64 { - max := int64(0) - for _, rs := range allRSs { - if v, err := deploymentutil.Revision(rs); err != nil { - // Skip the replica sets when it failed to parse their revision information - glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) - } else if v > max { - max = v - } - } - return max -} - -// lastRevision finds the second max revision number in all replica sets (the last revision) -func lastRevision(allRSs []*extensions.ReplicaSet) int64 { - max, secMax := int64(0), int64(0) - for _, rs := range allRSs { - if v, err := deploymentutil.Revision(rs); err != nil { - // Skip the replica sets when it failed to parse their revision information - glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) - } else if v >= max { - secMax = max - max = v - } else if v > secMax { - secMax = v - } - } - return secMax -} - -// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet. -// 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's). -// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes. -// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. -// Note that the pod-template-hash will be added to adopted RSes and pods. -func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) { - // Calculate revision number for this new replica set - newRevision := strconv.FormatInt(maxOldRevision+1, 10) - - existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList) - if err != nil { - return nil, err - } else if existingNewRS != nil { - // Set existing new replica set's annotation - if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) { - return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS) - } - return existingNewRS, nil - } - - if !createIfNotExisted { - return nil, nil - } - - // new ReplicaSet does not exist, create one. - namespace := deployment.ObjectMeta.Namespace - podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template) - newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment) - // Add podTemplateHash label to selector. - newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) - - // Create new ReplicaSet - newRS := extensions.ReplicaSet{ - ObjectMeta: api.ObjectMeta{ - // Make the name deterministic, to ensure idempotence - Name: deployment.Name + "-" + fmt.Sprintf("%d", podTemplateSpecHash), - Namespace: namespace, - }, - Spec: extensions.ReplicaSetSpec{ - Replicas: 0, - Selector: newRSSelector, - Template: newRSTemplate, - }, - } - allRSs := append(oldRSs, &newRS) - newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS) - if err != nil { - return nil, err - } - - newRS.Spec.Replicas = newReplicasCount - // Set new replica set's annotation - setNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) - createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) - if err != nil { - dc.enqueueDeployment(deployment) - return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err) - } - if newReplicasCount > 0 { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount) - } - - return createdRS, dc.updateDeploymentRevision(deployment, newRevision) -} - -// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced. -func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) { - rsList, err := deploymentutil.ListReplicaSets(deployment, - func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { - return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) - }) - if err != nil { - return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err) - } - syncedRSList := []extensions.ReplicaSet{} - for _, rs := range rsList { - // Add pod-template-hash information if it's not in the RS. - // Otherwise, new RS produced by Deployment will overlap with pre-existing ones - // that aren't constrained by the pod-template-hash. - syncedRS, err := dc.addHashKeyToRSAndPods(rs) - if err != nil { - return nil, nil, err - } - syncedRSList = append(syncedRSList, *syncedRS) - } - syncedPodList, err := dc.listPods(deployment) - if err != nil { - return nil, nil, err - } - return syncedRSList, syncedPodList, nil -} - -func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) { - return deploymentutil.ListPods(deployment, - func(namespace string, options api.ListOptions) (*api.PodList, error) { - podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) - return &podList, err - }) -} - -// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: -// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created -// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas -// 3. Add hash label to the rs's label and selector -func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) { - updatedRS = &rs - // If the rs already has the new hash label in its selector, it's done syncing - if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { - return - } - namespace := rs.Namespace - hash := rsutil.GetPodTemplateSpecHash(rs) - rsUpdated := false - // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. - updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, - func(updated *extensions.ReplicaSet) error { - // Precondition: the RS doesn't contain the new hash in its pod template label. - if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { - return utilerrors.ErrPreconditionViolated - } - updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - return nil - }) - if err != nil { - return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) - } - if !rsUpdated { - // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. - // Return here and retry in the next sync loop. - return &rs, nil - } - // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). - if updatedRS.Generation > updatedRS.Status.ObservedGeneration { - if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { - return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) - } - } - glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) - - // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. - selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) - } - options := api.ListOptions{LabelSelector: selector} - podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) - if err != nil { - return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) - } - allPodsLabeled := false - if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil { - return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) - } - // If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error. - // Return here and retry in the next sync loop. - if !allPodsLabeled { - return updatedRS, nil - } - - // We need to wait for the replicaset controller to observe the pods being - // labeled with pod template hash. Because previously we've called - // WaitForReplicaSetUpdated, the replicaset controller should have dropped - // FullyLabeledReplicas to 0 already, we only need to wait it to increase - // back to the number of replicas in the spec. - if err = deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { - return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) - } - - // 3. Update rs label and selector to include the new hash label - // Copy the old selector, so that we can scrub out any orphaned pods - if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, - func(updated *extensions.ReplicaSet) error { - // Precondition: the RS doesn't contain the new hash in its label or selector. - if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { - return utilerrors.ErrPreconditionViolated - } - updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) - return nil - }); err != nil { - return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) - } - if rsUpdated { - glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) - } - // If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet. - - // TODO: look for orphaned pods and label them in the background somewhere else periodically - - return updatedRS, nil -} - -// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and -// copying required deployment annotations to it; it returns true if replica set's annotation is changed. -func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool { - // First, copy deployment's annotations (except for apply and revision annotations) - annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) - // Then, update replica set's revision annotation - if newRS.Annotations == nil { - newRS.Annotations = make(map[string]string) - } - // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number - // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and - // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision. - if newRS.Annotations[deploymentutil.RevisionAnnotation] < newRevision { - newRS.Annotations[deploymentutil.RevisionAnnotation] = newRevision - annotationChanged = true - glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) - } - if !exists && setReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) { - annotationChanged = true - } - return annotationChanged -} - -var annotationsToSkip = map[string]bool{ - annotations.LastAppliedConfigAnnotation: true, - deploymentutil.RevisionAnnotation: true, - deploymentutil.DesiredReplicasAnnotation: true, - deploymentutil.MaxReplicasAnnotation: true, -} - -// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key -// TODO: How to decide which annotations should / should not be copied? -// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 -func skipCopyAnnotation(key string) bool { - return annotationsToSkip[key] -} - -func getSkippedAnnotations(annotations map[string]string) map[string]string { - skippedAnnotations := make(map[string]string) - for k, v := range annotations { - if skipCopyAnnotation(k) { - skippedAnnotations[k] = v - } - } - return skippedAnnotations -} - -// copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations, -// and returns true if replica set's annotation is changed. -// Note that apply and revision annotations are not copied. -func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool { - rsAnnotationsChanged := false - if rs.Annotations == nil { - rs.Annotations = make(map[string]string) - } - for k, v := range deployment.Annotations { - // newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated - // by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of - // deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable. - if skipCopyAnnotation(k) || rs.Annotations[k] == v { - continue - } - rs.Annotations[k] = v - rsAnnotationsChanged = true - } - return rsAnnotationsChanged -} - -// setDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations. -// This action should be done if and only if the deployment is rolling back to this rs. -// Note that apply and revision annotations are not changed. -func setDeploymentAnnotationsTo(deployment *extensions.Deployment, rollbackToRS *extensions.ReplicaSet) { - deployment.Annotations = getSkippedAnnotations(deployment.Annotations) - for k, v := range rollbackToRS.Annotations { - if !skipCopyAnnotation(k) { - deployment.Annotations[k] = v - } - } -} - -func (dc *DeploymentController) updateDeploymentRevision(deployment *extensions.Deployment, revision string) error { - if deployment.Annotations == nil { - deployment.Annotations = make(map[string]string) - } - if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision { - deployment.Annotations[deploymentutil.RevisionAnnotation] = revision - _, err := dc.updateDeployment(deployment) - return err - } - return nil -} - -func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { - if newRS.Spec.Replicas == deployment.Spec.Replicas { - // Scaling not required. - return false, nil - } - if newRS.Spec.Replicas > deployment.Spec.Replicas { - // Scale down. - scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) - return scaled, err - } - newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS) - if err != nil { - return false, err - } - scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment) - return scaled, err -} - -func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) { - podList, err := dc.listPods(deployment) - if err != nil { - return 0, err - } - return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds) -} - -func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { - oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) - if oldPodsCount == 0 { - // Can't scale down further - return false, nil - } - - minReadySeconds := deployment.Spec.MinReadySeconds - allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - // TODO: use dc.getAvailablePodsForReplicaSets instead - newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{newRS}, minReadySeconds) - if err != nil { - return false, fmt.Errorf("could not find available pods: %v", err) - } - maxUnavailable := maxUnavailable(*deployment) - - // Check if we can scale down. We can scale down in the following 2 cases: - // * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further - // increase unavailability. - // * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step. - // - // maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable - // take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from - // the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further - // step(that will increase unavailability). - // - // Concrete example: - // - // * 10 replicas - // * 2 maxUnavailable (absolute number, not percent) - // * 3 maxSurge (absolute number, not percent) - // - // case 1: - // * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5. - // * The new replica set pods crashloop and never become available. - // * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5. - // * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down. - // * The user notices the crashloop and does kubectl rollout undo to rollback. - // * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down. - // * The total number of pods will then be 9 and the newRS can be scaled up to 10. - // - // case 2: - // Same example, but pushing a new pod template instead of rolling back (aka "roll over"): - // * The new replica set created must start with 0 replicas because allPodsCount is already at 13. - // * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then - // allow the new replica set to be scaled up by 5. - minAvailable := deployment.Spec.Replicas - maxUnavailable - newRSUnavailablePodCount := newRS.Spec.Replicas - newRSAvailablePodCount - maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount - if maxScaledDown <= 0 { - return false, nil - } - - // Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment - // and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737 - oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, deployment.Spec.MinReadySeconds, maxScaledDown) - if err != nil { - return false, nil - } - glog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount) - - // Scale down old replica sets, need check maxUnavailable to ensure we can scale down - allRSs = append(oldRSs, newRS) - scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment) - if err != nil { - return false, nil - } - glog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount) - - totalScaledDown := cleanupCount + scaledDownCount - return totalScaledDown > 0, nil -} - -// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted. -func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, minReadySeconds, maxCleanupCount int32) ([]*extensions.ReplicaSet, int32, error) { - sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) - // Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order - // such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will - // been deleted first and won't increase unavailability. - totalScaledDown := int32(0) - for i, targetRS := range oldRSs { - if totalScaledDown >= maxCleanupCount { - break - } - if targetRS.Spec.Replicas == 0 { - // cannot scale down this replica set. - continue - } - // TODO: use dc.getAvailablePodsForReplicaSets instead - availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{targetRS}, minReadySeconds) - if err != nil { - return nil, totalScaledDown, fmt.Errorf("could not find available pods: %v", err) - } - if targetRS.Spec.Replicas == availablePodCount { - // no unhealthy replicas found, no scaling required. - continue - } - - scaledDownCount := int32(integer.IntMin(int(maxCleanupCount-totalScaledDown), int(targetRS.Spec.Replicas-availablePodCount))) - newReplicasCount := targetRS.Spec.Replicas - scaledDownCount - if newReplicasCount > targetRS.Spec.Replicas { - return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount) - } - _, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) - if err != nil { - return nil, totalScaledDown, err - } - totalScaledDown += scaledDownCount - oldRSs[i] = updatedOldRS - } - return oldRSs, totalScaledDown, nil -} - -// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate". -// Need check maxUnavailable to ensure availability -func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int32, error) { - maxUnavailable := maxUnavailable(*deployment) - - // Check if we can scale down. - minAvailable := deployment.Spec.Replicas - maxUnavailable - minReadySeconds := deployment.Spec.MinReadySeconds - // Find the number of ready pods. - // TODO: use dc.getAvailablePodsForReplicaSets instead - availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, allRSs, minReadySeconds) - if err != nil { - return 0, fmt.Errorf("could not find available pods: %v", err) - } - if availablePodCount <= minAvailable { - // Cannot scale down. - return 0, nil - } - glog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name) - - sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) - - totalScaledDown := int32(0) - totalScaleDownCount := availablePodCount - minAvailable - for _, targetRS := range oldRSs { - if totalScaledDown >= totalScaleDownCount { - // No further scaling required. - break - } - if targetRS.Spec.Replicas == 0 { - // cannot scale down this ReplicaSet. - continue - } - // Scale down. - scaleDownCount := int32(integer.IntMin(int(targetRS.Spec.Replicas), int(totalScaleDownCount-totalScaledDown))) - newReplicasCount := targetRS.Spec.Replicas - scaleDownCount - if newReplicasCount > targetRS.Spec.Replicas { - return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount) - } - _, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) - if err != nil { - return totalScaledDown, err - } - - totalScaledDown += scaleDownCount - } - - return totalScaledDown, nil -} - -// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate" -func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { - scaled := false - for _, rs := range oldRSs { - // Scaling not required. - if rs.Spec.Replicas == 0 { - continue - } - scaledRS, _, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment) - if err != nil { - return false, err - } - if scaledRS { - scaled = true - } - } - return scaled, nil -} - -// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate" -func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { - scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) - return scaled, err -} - -// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets -// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept -// around by default 1) for historical reasons and 2) for the ability to rollback a deployment. -func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error { - if deployment.Spec.RevisionHistoryLimit == nil { - return nil - } - diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit - if diff <= 0 { - return nil - } - - sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) - - var errList []error - // TODO: This should be parallelized. - for i := int32(0); i < diff; i++ { - rs := oldRSs[i] - // Avoid delete replica set with non-zero replica counts - if rs.Status.Replicas != 0 || rs.Spec.Replicas != 0 || rs.Generation > rs.Status.ObservedGeneration { - continue - } - if err := dc.client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, nil); err != nil && !errors.IsNotFound(err) { - glog.V(2).Infof("Failed deleting old replica set %v for deployment %v: %v", rs.Name, deployment.Name, err) - errList = append(errList, err) - } - } - - return utilerrors.NewAggregate(errList) -} - -func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error { - newStatus, err := dc.calculateStatus(allRSs, newRS, deployment) - if err != nil { - return err - } - newDeployment := deployment - newDeployment.Status = newStatus - _, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment) - return err -} - -func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) { - availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs) - if err != nil { - return deployment.Status, fmt.Errorf("failed to count available pods: %v", err) - } - totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - - return extensions.DeploymentStatus{ - // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. - ObservedGeneration: deployment.Generation, - Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs), - UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}), - AvailableReplicas: availableReplicas, - UnavailableReplicas: totalReplicas - availableReplicas, - }, nil -} - -func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) { - // No need to scale - if rs.Spec.Replicas == newScale { - return false, rs, nil - } - var scalingOperation string - if rs.Spec.Replicas < newScale { - scalingOperation = "up" - } else { - scalingOperation = "down" - } - newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation) - return true, newRS, err -} - -func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) { - // NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea. - rs.Spec.Replicas = newScale - setReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) - rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) - if err == nil { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) - } else { - glog.Warningf("Cannot update replica set %q: %v", rs.Name, err) - dc.enqueueDeployment(deployment) - } - return rs, err -} - -func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) { - return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) -} - -func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deployment, rs *extensions.ReplicaSet) (d *extensions.Deployment, performedRollback bool, err error) { - if !reflect.DeepEqual(deploymentutil.GetNewReplicaSetTemplate(deployment), rs.Spec.Template) { - glog.Infof("Rolling back deployment %s to template spec %+v", deployment.Name, rs.Spec.Template.Spec) - deploymentutil.SetFromReplicaSetTemplate(deployment, rs.Spec.Template) - // set RS (the old RS we'll rolling back to) annotations back to the deployment; - // otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback. - // - // For example, - // A Deployment has old RS1 with annotation {change-cause:create}, and new RS2 {change-cause:edit}. - // Note that both annotations are copied from Deployment, and the Deployment should be annotated {change-cause:edit} as well. - // Now, rollback Deployment to RS1, we should update Deployment's pod-template and also copy annotation from RS1. - // Deployment is now annotated {change-cause:create}, and we have new RS1 {change-cause:create}, old RS2 {change-cause:edit}. - // - // If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit}, - // and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct. - setDeploymentAnnotationsTo(deployment, rs) - performedRollback = true - } else { - glog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %s, skipping rollback...", deployment.Name) - dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackTemplateUnchanged, fmt.Sprintf("The rollback revision contains the same template as current deployment %q", deployment.Name)) - } - d, err = dc.updateDeploymentAndClearRollbackTo(deployment) - return -} - -// isScalingEvent checks whether the provided deployment has been updated with a scaling event -// by looking at the desired-replicas annotation in the active replica sets of the deployment. -func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) bool { - newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) - if err != nil { - return false - } - // If there is no new replica set matching this deployment and the deployment isn't paused - // then there is a new rollout that waits to happen - if newRS == nil && !d.Spec.Paused { - return false - } - allRSs := append(oldRSs, newRS) - for _, rs := range controller.FilterActiveReplicaSets(allRSs) { - desired, ok := getDesiredReplicasAnnotation(rs) - if !ok { - continue - } - if desired != d.Spec.Replicas { - return true - } - } - return false -} diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index d9c2b181059..9ade955fff7 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -19,7 +19,6 @@ package deployment import ( "fmt" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" @@ -144,805 +143,6 @@ func newReplicaSet(d *exp.Deployment, name string, replicas int) *exp.ReplicaSet } } -// TestScale tests proportional scaling of deployments. Note that fenceposts for -// rolling out (maxUnavailable, maxSurge) have no meaning for simple scaling other -// than recording maxSurge as part of the max-replicas annotation that is taken -// into account in the next scale event (max-replicas is used for calculating the -// proportion of a replica set). -func TestScale(t *testing.T) { - newTimestamp := unversioned.Date(2016, 5, 20, 2, 0, 0, 0, time.UTC) - oldTimestamp := unversioned.Date(2016, 5, 20, 1, 0, 0, 0, time.UTC) - olderTimestamp := unversioned.Date(2016, 5, 20, 0, 0, 0, 0, time.UTC) - - tests := []struct { - name string - deployment *exp.Deployment - oldDeployment *exp.Deployment - - newRS *exp.ReplicaSet - oldRSs []*exp.ReplicaSet - - expectedNew *exp.ReplicaSet - expectedOld []*exp.ReplicaSet - - desiredReplicasAnnotations map[string]int32 - }{ - { - name: "normal scaling event: 10 -> 12", - deployment: newDeployment(12, nil), - oldDeployment: newDeployment(10, nil), - - newRS: rs("foo-v1", 10, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{}, - - expectedNew: rs("foo-v1", 12, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{}, - }, - { - name: "normal scaling event: 10 -> 5", - deployment: newDeployment(5, nil), - oldDeployment: newDeployment(10, nil), - - newRS: rs("foo-v1", 10, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{}, - - expectedNew: rs("foo-v1", 5, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{}, - }, - { - name: "proportional scaling: 5 -> 10", - deployment: newDeployment(10, nil), - oldDeployment: newDeployment(5, nil), - - newRS: rs("foo-v2", 2, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, - - expectedNew: rs("foo-v2", 4, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, - }, - { - name: "proportional scaling: 5 -> 3", - deployment: newDeployment(3, nil), - oldDeployment: newDeployment(5, nil), - - newRS: rs("foo-v2", 2, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, - - expectedNew: rs("foo-v2", 1, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v1", 2, nil, oldTimestamp)}, - }, - { - name: "proportional scaling: 9 -> 4", - deployment: newDeployment(4, nil), - oldDeployment: newDeployment(9, nil), - - newRS: rs("foo-v2", 8, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v1", 1, nil, oldTimestamp)}, - - expectedNew: rs("foo-v2", 4, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v1", 0, nil, oldTimestamp)}, - }, - { - name: "proportional scaling: 7 -> 10", - deployment: newDeployment(10, nil), - oldDeployment: newDeployment(7, nil), - - newRS: rs("foo-v3", 2, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, - - expectedNew: rs("foo-v3", 3, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v2", 4, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, - }, - { - name: "proportional scaling: 13 -> 8", - deployment: newDeployment(8, nil), - oldDeployment: newDeployment(13, nil), - - newRS: rs("foo-v3", 2, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v2", 8, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, - - expectedNew: rs("foo-v3", 1, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v2", 5, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, - }, - // Scales up the new replica set. - { - name: "leftover distribution: 3 -> 4", - deployment: newDeployment(4, nil), - oldDeployment: newDeployment(3, nil), - - newRS: rs("foo-v3", 1, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, - - expectedNew: rs("foo-v3", 2, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, - }, - // Scales down the older replica set. - { - name: "leftover distribution: 3 -> 2", - deployment: newDeployment(2, nil), - oldDeployment: newDeployment(3, nil), - - newRS: rs("foo-v3", 1, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, - - expectedNew: rs("foo-v3", 1, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, - }, - // Scales up the latest replica set first. - { - name: "proportional scaling (no new rs): 4 -> 5", - deployment: newDeployment(5, nil), - oldDeployment: newDeployment(4, nil), - - newRS: nil, - oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, - - expectedNew: nil, - expectedOld: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, - }, - // Scales down to zero - { - name: "proportional scaling: 6 -> 0", - deployment: newDeployment(0, nil), - oldDeployment: newDeployment(6, nil), - - newRS: rs("foo-v3", 3, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, - - expectedNew: rs("foo-v3", 0, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, - }, - // Scales up from zero - { - name: "proportional scaling: 0 -> 6", - deployment: newDeployment(6, nil), - oldDeployment: newDeployment(0, nil), - - newRS: rs("foo-v3", 0, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, - - expectedNew: rs("foo-v3", 6, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, - }, - // Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 ) - // Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to - // update. - { - name: "failed rs update", - deployment: newDeployment(5, nil), - oldDeployment: newDeployment(5, nil), - - newRS: rs("foo-v3", 2, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, - - expectedNew: rs("foo-v3", 2, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, - - desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)}, - }, - { - name: "deployment with surge pods", - deployment: newDeploymentEnhanced(20, intstr.FromInt(2)), - oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(2)), - - newRS: rs("foo-v2", 6, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, - - expectedNew: rs("foo-v2", 11, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v1", 11, nil, oldTimestamp)}, - }, - { - name: "change both surge and size", - deployment: newDeploymentEnhanced(50, intstr.FromInt(6)), - oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(3)), - - newRS: rs("foo-v2", 5, nil, newTimestamp), - oldRSs: []*exp.ReplicaSet{rs("foo-v1", 8, nil, oldTimestamp)}, - - expectedNew: rs("foo-v2", 22, nil, newTimestamp), - expectedOld: []*exp.ReplicaSet{rs("foo-v1", 34, nil, oldTimestamp)}, - }, - } - - for _, test := range tests { - _ = olderTimestamp - t.Log(test.name) - fake := fake.Clientset{} - dc := &DeploymentController{ - client: &fake, - eventRecorder: &record.FakeRecorder{}, - } - - if test.newRS != nil { - desiredReplicas := test.oldDeployment.Spec.Replicas - if desired, ok := test.desiredReplicasAnnotations[test.newRS.Name]; ok { - desiredReplicas = desired - } - setReplicasAnnotations(test.newRS, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) - } - for i := range test.oldRSs { - rs := test.oldRSs[i] - if rs == nil { - continue - } - desiredReplicas := test.oldDeployment.Spec.Replicas - if desired, ok := test.desiredReplicasAnnotations[rs.Name]; ok { - desiredReplicas = desired - } - setReplicasAnnotations(rs, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) - } - - if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil { - t.Errorf("%s: unexpected error: %v", test.name, err) - continue - } - if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != test.newRS.Spec.Replicas { - t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, test.newRS.Spec.Replicas) - continue - } - if len(test.expectedOld) != len(test.oldRSs) { - t.Errorf("%s: expected %d old replica sets, got %d", test.name, len(test.expectedOld), len(test.oldRSs)) - continue - } - for n := range test.oldRSs { - rs := test.oldRSs[n] - exp := test.expectedOld[n] - if exp.Spec.Replicas != rs.Spec.Replicas { - t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, exp.Spec.Replicas, rs.Spec.Replicas) - } - } - } -} - -func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { - tests := []struct { - deploymentReplicas int - maxSurge intstr.IntOrString - oldReplicas int - newReplicas int - scaleExpected bool - expectedNewReplicas int - }{ - { - // Should not scale up. - deploymentReplicas: 10, - maxSurge: intstr.FromInt(0), - oldReplicas: 10, - newReplicas: 0, - scaleExpected: false, - }, - { - deploymentReplicas: 10, - maxSurge: intstr.FromInt(2), - oldReplicas: 10, - newReplicas: 0, - scaleExpected: true, - expectedNewReplicas: 2, - }, - { - deploymentReplicas: 10, - maxSurge: intstr.FromInt(2), - oldReplicas: 5, - newReplicas: 0, - scaleExpected: true, - expectedNewReplicas: 7, - }, - { - deploymentReplicas: 10, - maxSurge: intstr.FromInt(2), - oldReplicas: 10, - newReplicas: 2, - scaleExpected: false, - }, - { - // Should scale down. - deploymentReplicas: 10, - maxSurge: intstr.FromInt(2), - oldReplicas: 2, - newReplicas: 11, - scaleExpected: true, - expectedNewReplicas: 10, - }, - } - - for i, test := range tests { - t.Logf("executing scenario %d", i) - newRS := rs("foo-v2", test.newReplicas, nil, noTimestamp) - oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) - allRSs := []*exp.ReplicaSet{newRS, oldRS} - deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil) - fake := fake.Clientset{} - controller := &DeploymentController{ - client: &fake, - eventRecorder: &record.FakeRecorder{}, - } - scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, &deployment) - if err != nil { - t.Errorf("unexpected error: %v", err) - continue - } - if !test.scaleExpected { - if scaled || len(fake.Actions()) > 0 { - t.Errorf("unexpected scaling: %v", fake.Actions()) - } - continue - } - if test.scaleExpected && !scaled { - t.Errorf("expected scaling to occur") - continue - } - if len(fake.Actions()) != 1 { - t.Errorf("expected 1 action during scale, got: %v", fake.Actions()) - continue - } - updated := fake.Actions()[0].(core.UpdateAction).GetObject().(*exp.ReplicaSet) - if e, a := test.expectedNewReplicas, int(updated.Spec.Replicas); e != a { - t.Errorf("expected update to %d replicas, got %d", e, a) - } - } -} - -func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { - tests := []struct { - deploymentReplicas int - maxUnavailable intstr.IntOrString - oldReplicas int - newReplicas int - readyPodsFromOldRS int - readyPodsFromNewRS int - scaleExpected bool - expectedOldReplicas int - }{ - { - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(0), - oldReplicas: 10, - newReplicas: 0, - readyPodsFromOldRS: 10, - readyPodsFromNewRS: 0, - scaleExpected: true, - expectedOldReplicas: 9, - }, - { - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(2), - oldReplicas: 10, - newReplicas: 0, - readyPodsFromOldRS: 10, - readyPodsFromNewRS: 0, - scaleExpected: true, - expectedOldReplicas: 8, - }, - { // expect unhealthy replicas from old replica sets been cleaned up - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(2), - oldReplicas: 10, - newReplicas: 0, - readyPodsFromOldRS: 8, - readyPodsFromNewRS: 0, - scaleExpected: true, - expectedOldReplicas: 8, - }, - { // expect 1 unhealthy replica from old replica sets been cleaned up, and 1 ready pod been scaled down - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(2), - oldReplicas: 10, - newReplicas: 0, - readyPodsFromOldRS: 9, - readyPodsFromNewRS: 0, - scaleExpected: true, - expectedOldReplicas: 8, - }, - { // the unavailable pods from the newRS would not make us scale down old RSs in a further step - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(2), - oldReplicas: 8, - newReplicas: 2, - readyPodsFromOldRS: 8, - readyPodsFromNewRS: 0, - scaleExpected: false, - }, - } - for i, test := range tests { - t.Logf("executing scenario %d", i) - - newSelector := map[string]string{"foo": "new"} - oldSelector := map[string]string{"foo": "old"} - newRS := rs("foo-new", test.newReplicas, newSelector, noTimestamp) - oldRS := rs("foo-old", test.oldReplicas, oldSelector, noTimestamp) - oldRSs := []*exp.ReplicaSet{oldRS} - allRSs := []*exp.ReplicaSet{oldRS, newRS} - - deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, newSelector) - fakeClientset := fake.Clientset{} - fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - switch action.(type) { - case core.ListAction: - podList := &api.PodList{} - for podIndex := 0; podIndex < test.readyPodsFromOldRS; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-oldReadyPod-%d", oldRS.Name, podIndex), - Labels: oldSelector, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - }) - } - for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-oldUnhealthyPod-%d", oldRS.Name, podIndex), - Labels: oldSelector, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionFalse, - }, - }, - }, - }) - } - for podIndex := 0; podIndex < test.readyPodsFromNewRS; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-newReadyPod-%d", oldRS.Name, podIndex), - Labels: newSelector, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - }) - } - for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-newUnhealthyPod-%d", oldRS.Name, podIndex), - Labels: newSelector, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionFalse, - }, - }, - }, - }) - } - return true, podList, nil - } - return false, nil, nil - }) - controller := &DeploymentController{ - client: &fakeClientset, - eventRecorder: &record.FakeRecorder{}, - } - - scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, &deployment) - if err != nil { - t.Errorf("unexpected error: %v", err) - continue - } - if !test.scaleExpected && scaled { - t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) - } - if test.scaleExpected && !scaled { - t.Errorf("expected scaling to occur") - continue - } - continue - } -} - -func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { - tests := []struct { - oldReplicas int - readyPods int - unHealthyPods int - maxCleanupCount int - cleanupCountExpected int - }{ - { - oldReplicas: 10, - readyPods: 8, - unHealthyPods: 2, - maxCleanupCount: 1, - cleanupCountExpected: 1, - }, - { - oldReplicas: 10, - readyPods: 8, - unHealthyPods: 2, - maxCleanupCount: 3, - cleanupCountExpected: 2, - }, - { - oldReplicas: 10, - readyPods: 8, - unHealthyPods: 2, - maxCleanupCount: 0, - cleanupCountExpected: 0, - }, - { - oldReplicas: 10, - readyPods: 10, - unHealthyPods: 0, - maxCleanupCount: 3, - cleanupCountExpected: 0, - }, - } - - for i, test := range tests { - t.Logf("executing scenario %d", i) - oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) - oldRSs := []*exp.ReplicaSet{oldRS} - deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil) - fakeClientset := fake.Clientset{} - fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - switch action.(type) { - case core.ListAction: - podList := &api.PodList{} - for podIndex := 0; podIndex < test.readyPods; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-readyPod-%d", oldRS.Name, podIndex), - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - }) - } - for podIndex := 0; podIndex < test.unHealthyPods; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-unHealthyPod-%d", oldRS.Name, podIndex), - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionFalse, - }, - }, - }, - }) - } - return true, podList, nil - } - return false, nil, nil - }) - - controller := &DeploymentController{ - client: &fakeClientset, - eventRecorder: &record.FakeRecorder{}, - } - _, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, 0, int32(test.maxCleanupCount)) - if err != nil { - t.Errorf("unexpected error: %v", err) - continue - } - if int(cleanupCount) != test.cleanupCountExpected { - t.Errorf("expected %v unhealthy replicas been cleaned up, got %v", test.cleanupCountExpected, cleanupCount) - continue - } - } -} - -func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing.T) { - tests := []struct { - deploymentReplicas int - maxUnavailable intstr.IntOrString - readyPods int - oldReplicas int - scaleExpected bool - expectedOldReplicas int - }{ - { - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(0), - readyPods: 10, - oldReplicas: 10, - scaleExpected: true, - expectedOldReplicas: 9, - }, - { - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(2), - readyPods: 10, - oldReplicas: 10, - scaleExpected: true, - expectedOldReplicas: 8, - }, - { - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(2), - readyPods: 8, - oldReplicas: 10, - scaleExpected: false, - }, - { - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(2), - readyPods: 10, - oldReplicas: 0, - scaleExpected: false, - }, - { - deploymentReplicas: 10, - maxUnavailable: intstr.FromInt(2), - readyPods: 1, - oldReplicas: 10, - scaleExpected: false, - }, - } - - for i, test := range tests { - t.Logf("executing scenario %d", i) - oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) - allRSs := []*exp.ReplicaSet{oldRS} - oldRSs := []*exp.ReplicaSet{oldRS} - deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"}) - fakeClientset := fake.Clientset{} - fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - switch action.(type) { - case core.ListAction: - podList := &api.PodList{} - for podIndex := 0; podIndex < test.readyPods; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-pod-%d", oldRS.Name, podIndex), - Labels: map[string]string{"foo": "bar"}, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - }) - } - return true, podList, nil - } - return false, nil, nil - }) - controller := &DeploymentController{ - client: &fakeClientset, - eventRecorder: &record.FakeRecorder{}, - } - scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, &deployment) - if err != nil { - t.Errorf("unexpected error: %v", err) - continue - } - if !test.scaleExpected { - if scaled != 0 { - t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) - } - continue - } - if test.scaleExpected && scaled == 0 { - t.Errorf("expected scaling to occur; actions: %v", fakeClientset.Actions()) - continue - } - // There are both list and update actions logged, so extract the update - // action for verification. - var updateAction core.UpdateAction - for _, action := range fakeClientset.Actions() { - switch a := action.(type) { - case core.UpdateAction: - if updateAction != nil { - t.Errorf("expected only 1 update action; had %v and found %v", updateAction, a) - } else { - updateAction = a - } - } - } - if updateAction == nil { - t.Errorf("expected an update action") - continue - } - updated := updateAction.GetObject().(*exp.ReplicaSet) - if e, a := test.expectedOldReplicas, int(updated.Spec.Replicas); e != a { - t.Errorf("expected update to %d replicas, got %d", e, a) - } - } -} - -func TestDeploymentController_cleanupDeployment(t *testing.T) { - selector := map[string]string{"foo": "bar"} - - tests := []struct { - oldRSs []*exp.ReplicaSet - revisionHistoryLimit int - expectedDeletions int - }{ - { - oldRSs: []*exp.ReplicaSet{ - newRSWithStatus("foo-1", 0, 0, selector), - newRSWithStatus("foo-2", 0, 0, selector), - newRSWithStatus("foo-3", 0, 0, selector), - }, - revisionHistoryLimit: 1, - expectedDeletions: 2, - }, - { - // Only delete the replica set with Spec.Replicas = Status.Replicas = 0. - oldRSs: []*exp.ReplicaSet{ - newRSWithStatus("foo-1", 0, 0, selector), - newRSWithStatus("foo-2", 0, 1, selector), - newRSWithStatus("foo-3", 1, 0, selector), - newRSWithStatus("foo-4", 1, 1, selector), - }, - revisionHistoryLimit: 0, - expectedDeletions: 1, - }, - - { - oldRSs: []*exp.ReplicaSet{ - newRSWithStatus("foo-1", 0, 0, selector), - newRSWithStatus("foo-2", 0, 0, selector), - }, - revisionHistoryLimit: 0, - expectedDeletions: 2, - }, - { - oldRSs: []*exp.ReplicaSet{ - newRSWithStatus("foo-1", 1, 1, selector), - newRSWithStatus("foo-2", 1, 1, selector), - }, - revisionHistoryLimit: 0, - expectedDeletions: 0, - }, - } - - for i, test := range tests { - fake := &fake.Clientset{} - controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) - - controller.eventRecorder = &record.FakeRecorder{} - controller.rsStoreSynced = alwaysReady - controller.podStoreSynced = alwaysReady - for _, rs := range test.oldRSs { - controller.rsStore.Add(rs) - } - - d := newDeployment(1, &tests[i].revisionHistoryLimit) - controller.cleanupDeployment(test.oldRSs, d) - - gotDeletions := 0 - for _, action := range fake.Actions() { - if "delete" == action.GetVerb() { - gotDeletions++ - } - } - if gotDeletions != test.expectedDeletions { - t.Errorf("expect %v old replica sets been deleted, but got %v", test.expectedDeletions, gotDeletions) - continue - } - } -} - func getKey(d *exp.Deployment, t *testing.T) string { if key, err := controller.KeyFunc(d); err != nil { t.Errorf("Unexpected error getting key for deployment %v: %v", d.Name, err) diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go new file mode 100644 index 00000000000..ccd968348ca --- /dev/null +++ b/pkg/controller/deployment/recreate.go @@ -0,0 +1,92 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/controller" +) + +// rolloutRecreate implements the logic for recreating a replica set. +func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error { + // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) + if err != nil { + return err + } + allRSs := append(oldRSs, newRS) + + // scale down old replica sets + scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(controller.FilterActiveReplicaSets(oldRSs), deployment) + if err != nil { + return err + } + if scaledDown { + // Update DeploymentStatus + return dc.updateDeploymentStatus(allRSs, newRS, deployment) + } + + // If we need to create a new RS, create it now + // TODO: Create a new RS without re-listing all RSs. + if newRS == nil { + newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(deployment, true) + if err != nil { + return err + } + allRSs = append(oldRSs, newRS) + } + + // scale up new replica set + scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment) + if err != nil { + return err + } + if scaledUp { + // Update DeploymentStatus + return dc.updateDeploymentStatus(allRSs, newRS, deployment) + } + + dc.cleanupDeployment(oldRSs, deployment) + + // Sync deployment status + return dc.syncDeploymentStatus(allRSs, newRS, deployment) +} + +// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate" +func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { + scaled := false + for _, rs := range oldRSs { + // Scaling not required. + if rs.Spec.Replicas == 0 { + continue + } + scaledRS, _, err := dc.scaleReplicaSetAndRecordEvent(rs, 0, deployment) + if err != nil { + return false, err + } + if scaledRS { + scaled = true + } + } + return scaled, nil +} + +// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate" +func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) + return scaled, err +} diff --git a/pkg/controller/deployment/rollback.go b/pkg/controller/deployment/rollback.go new file mode 100644 index 00000000000..aabfbc42ad8 --- /dev/null +++ b/pkg/controller/deployment/rollback.go @@ -0,0 +1,105 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "reflect" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" + deploymentutil "k8s.io/kubernetes/pkg/util/deployment" +) + +// Rolling back to a revision; no-op if the toRevision is deployment's current revision +func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) { + newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) + if err != nil { + return nil, err + } + allRSs := append(allOldRSs, newRS) + // If rollback revision is 0, rollback to the last revision + if *toRevision == 0 { + if *toRevision = lastRevision(allRSs); *toRevision == 0 { + // If we still can't find the last revision, gives up rollback + dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") + // Gives up rollback + return dc.updateDeploymentAndClearRollbackTo(deployment) + } + } + for _, rs := range allRSs { + v, err := deploymentutil.Revision(rs) + if err != nil { + glog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err) + continue + } + if v == *toRevision { + glog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v) + // rollback by copying podTemplate.Spec from the replica set, and increment revision number by 1 + // no-op if the the spec matches current deployment's podTemplate.Spec + deployment, performedRollback, err := dc.rollbackToTemplate(deployment, rs) + if performedRollback && err == nil { + dc.emitRollbackNormalEvent(deployment, fmt.Sprintf("Rolled back deployment %q to revision %d", deployment.Name, *toRevision)) + } + return deployment, err + } + } + dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.") + // Gives up rollback + return dc.updateDeploymentAndClearRollbackTo(deployment) +} + +func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deployment, rs *extensions.ReplicaSet) (d *extensions.Deployment, performedRollback bool, err error) { + if !reflect.DeepEqual(deploymentutil.GetNewReplicaSetTemplate(deployment), rs.Spec.Template) { + glog.Infof("Rolling back deployment %s to template spec %+v", deployment.Name, rs.Spec.Template.Spec) + deploymentutil.SetFromReplicaSetTemplate(deployment, rs.Spec.Template) + // set RS (the old RS we'll rolling back to) annotations back to the deployment; + // otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback. + // + // For example, + // A Deployment has old RS1 with annotation {change-cause:create}, and new RS2 {change-cause:edit}. + // Note that both annotations are copied from Deployment, and the Deployment should be annotated {change-cause:edit} as well. + // Now, rollback Deployment to RS1, we should update Deployment's pod-template and also copy annotation from RS1. + // Deployment is now annotated {change-cause:create}, and we have new RS1 {change-cause:create}, old RS2 {change-cause:edit}. + // + // If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit}, + // and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct. + setDeploymentAnnotationsTo(deployment, rs) + performedRollback = true + } else { + glog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %s, skipping rollback...", deployment.Name) + dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackTemplateUnchanged, fmt.Sprintf("The rollback revision contains the same template as current deployment %q", deployment.Name)) + } + d, err = dc.updateDeploymentAndClearRollbackTo(deployment) + return +} + +func (dc *DeploymentController) emitRollbackWarningEvent(deployment *extensions.Deployment, reason, message string) { + dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, reason, message) +} + +func (dc *DeploymentController) emitRollbackNormalEvent(deployment *extensions.Deployment, message string) { + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, deploymentutil.RollbackDone, message) +} + +// updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment +func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *extensions.Deployment) (*extensions.Deployment, error) { + glog.V(4).Infof("Cleans up rollbackTo of deployment %s", deployment.Name) + deployment.Spec.RollbackTo = nil + return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) +} diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go new file mode 100644 index 00000000000..4708dcf9b7a --- /dev/null +++ b/pkg/controller/deployment/rolling.go @@ -0,0 +1,243 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "sort" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/controller" + deploymentutil "k8s.io/kubernetes/pkg/util/deployment" + "k8s.io/kubernetes/pkg/util/integer" +) + +// rolloutRolling implements the logic for rolling a new replica set. +func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) + if err != nil { + return err + } + allRSs := append(oldRSs, newRS) + + // Scale up, if we can. + scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment) + if err != nil { + return err + } + if scaledUp { + // Update DeploymentStatus + return dc.updateDeploymentStatus(allRSs, newRS, deployment) + } + + // Scale down, if we can. + scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment) + if err != nil { + return err + } + if scaledDown { + // Update DeploymentStatus + return dc.updateDeploymentStatus(allRSs, newRS, deployment) + } + + dc.cleanupDeployment(oldRSs, deployment) + + // Sync deployment status + return dc.syncDeploymentStatus(allRSs, newRS, deployment) +} + +func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { + if newRS.Spec.Replicas == deployment.Spec.Replicas { + // Scaling not required. + return false, nil + } + if newRS.Spec.Replicas > deployment.Spec.Replicas { + // Scale down. + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, deployment.Spec.Replicas, deployment) + return scaled, err + } + newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS) + if err != nil { + return false, err + } + scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment) + return scaled, err +} + +func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { + oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) + if oldPodsCount == 0 { + // Can't scale down further + return false, nil + } + + minReadySeconds := deployment.Spec.MinReadySeconds + allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) + // TODO: use dc.getAvailablePodsForReplicaSets instead + newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{newRS}, minReadySeconds) + if err != nil { + return false, fmt.Errorf("could not find available pods: %v", err) + } + maxUnavailable := maxUnavailable(*deployment) + + // Check if we can scale down. We can scale down in the following 2 cases: + // * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further + // increase unavailability. + // * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step. + // + // maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable + // take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from + // the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further + // step(that will increase unavailability). + // + // Concrete example: + // + // * 10 replicas + // * 2 maxUnavailable (absolute number, not percent) + // * 3 maxSurge (absolute number, not percent) + // + // case 1: + // * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5. + // * The new replica set pods crashloop and never become available. + // * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5. + // * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down. + // * The user notices the crashloop and does kubectl rollout undo to rollback. + // * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down. + // * The total number of pods will then be 9 and the newRS can be scaled up to 10. + // + // case 2: + // Same example, but pushing a new pod template instead of rolling back (aka "roll over"): + // * The new replica set created must start with 0 replicas because allPodsCount is already at 13. + // * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then + // allow the new replica set to be scaled up by 5. + minAvailable := deployment.Spec.Replicas - maxUnavailable + newRSUnavailablePodCount := newRS.Spec.Replicas - newRSAvailablePodCount + maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount + if maxScaledDown <= 0 { + return false, nil + } + + // Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment + // and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737 + oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, deployment.Spec.MinReadySeconds, maxScaledDown) + if err != nil { + return false, nil + } + glog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount) + + // Scale down old replica sets, need check maxUnavailable to ensure we can scale down + allRSs = append(oldRSs, newRS) + scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, deployment) + if err != nil { + return false, nil + } + glog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount) + + totalScaledDown := cleanupCount + scaledDownCount + return totalScaledDown > 0, nil +} + +// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted. +func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, minReadySeconds, maxCleanupCount int32) ([]*extensions.ReplicaSet, int32, error) { + sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) + // Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order + // such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will + // been deleted first and won't increase unavailability. + totalScaledDown := int32(0) + for i, targetRS := range oldRSs { + if totalScaledDown >= maxCleanupCount { + break + } + if targetRS.Spec.Replicas == 0 { + // cannot scale down this replica set. + continue + } + // TODO: use dc.getAvailablePodsForReplicaSets instead + availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{targetRS}, minReadySeconds) + if err != nil { + return nil, totalScaledDown, fmt.Errorf("could not find available pods: %v", err) + } + if targetRS.Spec.Replicas == availablePodCount { + // no unhealthy replicas found, no scaling required. + continue + } + + scaledDownCount := int32(integer.IntMin(int(maxCleanupCount-totalScaledDown), int(targetRS.Spec.Replicas-availablePodCount))) + newReplicasCount := targetRS.Spec.Replicas - scaledDownCount + if newReplicasCount > targetRS.Spec.Replicas { + return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount) + } + _, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) + if err != nil { + return nil, totalScaledDown, err + } + totalScaledDown += scaledDownCount + oldRSs[i] = updatedOldRS + } + return oldRSs, totalScaledDown, nil +} + +// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate". +// Need check maxUnavailable to ensure availability +func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int32, error) { + maxUnavailable := maxUnavailable(*deployment) + + // Check if we can scale down. + minAvailable := deployment.Spec.Replicas - maxUnavailable + minReadySeconds := deployment.Spec.MinReadySeconds + // Find the number of ready pods. + // TODO: use dc.getAvailablePodsForReplicaSets instead + availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, allRSs, minReadySeconds) + if err != nil { + return 0, fmt.Errorf("could not find available pods: %v", err) + } + if availablePodCount <= minAvailable { + // Cannot scale down. + return 0, nil + } + glog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name) + + sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) + + totalScaledDown := int32(0) + totalScaleDownCount := availablePodCount - minAvailable + for _, targetRS := range oldRSs { + if totalScaledDown >= totalScaleDownCount { + // No further scaling required. + break + } + if targetRS.Spec.Replicas == 0 { + // cannot scale down this ReplicaSet. + continue + } + // Scale down. + scaleDownCount := int32(integer.IntMin(int(targetRS.Spec.Replicas), int(totalScaleDownCount-totalScaledDown))) + newReplicasCount := targetRS.Spec.Replicas - scaleDownCount + if newReplicasCount > targetRS.Spec.Replicas { + return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount) + } + _, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) + if err != nil { + return totalScaledDown, err + } + + totalScaledDown += scaleDownCount + } + + return totalScaledDown, nil +} diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go new file mode 100644 index 00000000000..5407a1e2c72 --- /dev/null +++ b/pkg/controller/deployment/rolling_test.go @@ -0,0 +1,505 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "testing" + + "k8s.io/kubernetes/pkg/api" + exp "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/intstr" +) + +func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { + tests := []struct { + deploymentReplicas int + maxSurge intstr.IntOrString + oldReplicas int + newReplicas int + scaleExpected bool + expectedNewReplicas int + }{ + { + // Should not scale up. + deploymentReplicas: 10, + maxSurge: intstr.FromInt(0), + oldReplicas: 10, + newReplicas: 0, + scaleExpected: false, + }, + { + deploymentReplicas: 10, + maxSurge: intstr.FromInt(2), + oldReplicas: 10, + newReplicas: 0, + scaleExpected: true, + expectedNewReplicas: 2, + }, + { + deploymentReplicas: 10, + maxSurge: intstr.FromInt(2), + oldReplicas: 5, + newReplicas: 0, + scaleExpected: true, + expectedNewReplicas: 7, + }, + { + deploymentReplicas: 10, + maxSurge: intstr.FromInt(2), + oldReplicas: 10, + newReplicas: 2, + scaleExpected: false, + }, + { + // Should scale down. + deploymentReplicas: 10, + maxSurge: intstr.FromInt(2), + oldReplicas: 2, + newReplicas: 11, + scaleExpected: true, + expectedNewReplicas: 10, + }, + } + + for i, test := range tests { + t.Logf("executing scenario %d", i) + newRS := rs("foo-v2", test.newReplicas, nil, noTimestamp) + oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) + allRSs := []*exp.ReplicaSet{newRS, oldRS} + deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil) + fake := fake.Clientset{} + controller := &DeploymentController{ + client: &fake, + eventRecorder: &record.FakeRecorder{}, + } + scaled, err := controller.reconcileNewReplicaSet(allRSs, newRS, &deployment) + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + if !test.scaleExpected { + if scaled || len(fake.Actions()) > 0 { + t.Errorf("unexpected scaling: %v", fake.Actions()) + } + continue + } + if test.scaleExpected && !scaled { + t.Errorf("expected scaling to occur") + continue + } + if len(fake.Actions()) != 1 { + t.Errorf("expected 1 action during scale, got: %v", fake.Actions()) + continue + } + updated := fake.Actions()[0].(core.UpdateAction).GetObject().(*exp.ReplicaSet) + if e, a := test.expectedNewReplicas, int(updated.Spec.Replicas); e != a { + t.Errorf("expected update to %d replicas, got %d", e, a) + } + } +} + +func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { + tests := []struct { + deploymentReplicas int + maxUnavailable intstr.IntOrString + oldReplicas int + newReplicas int + readyPodsFromOldRS int + readyPodsFromNewRS int + scaleExpected bool + expectedOldReplicas int + }{ + { + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(0), + oldReplicas: 10, + newReplicas: 0, + readyPodsFromOldRS: 10, + readyPodsFromNewRS: 0, + scaleExpected: true, + expectedOldReplicas: 9, + }, + { + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(2), + oldReplicas: 10, + newReplicas: 0, + readyPodsFromOldRS: 10, + readyPodsFromNewRS: 0, + scaleExpected: true, + expectedOldReplicas: 8, + }, + { // expect unhealthy replicas from old replica sets been cleaned up + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(2), + oldReplicas: 10, + newReplicas: 0, + readyPodsFromOldRS: 8, + readyPodsFromNewRS: 0, + scaleExpected: true, + expectedOldReplicas: 8, + }, + { // expect 1 unhealthy replica from old replica sets been cleaned up, and 1 ready pod been scaled down + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(2), + oldReplicas: 10, + newReplicas: 0, + readyPodsFromOldRS: 9, + readyPodsFromNewRS: 0, + scaleExpected: true, + expectedOldReplicas: 8, + }, + { // the unavailable pods from the newRS would not make us scale down old RSs in a further step + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(2), + oldReplicas: 8, + newReplicas: 2, + readyPodsFromOldRS: 8, + readyPodsFromNewRS: 0, + scaleExpected: false, + }, + } + for i, test := range tests { + t.Logf("executing scenario %d", i) + + newSelector := map[string]string{"foo": "new"} + oldSelector := map[string]string{"foo": "old"} + newRS := rs("foo-new", test.newReplicas, newSelector, noTimestamp) + oldRS := rs("foo-old", test.oldReplicas, oldSelector, noTimestamp) + oldRSs := []*exp.ReplicaSet{oldRS} + allRSs := []*exp.ReplicaSet{oldRS, newRS} + + deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, newSelector) + fakeClientset := fake.Clientset{} + fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + switch action.(type) { + case core.ListAction: + podList := &api.PodList{} + for podIndex := 0; podIndex < test.readyPodsFromOldRS; podIndex++ { + podList.Items = append(podList.Items, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%s-oldReadyPod-%d", oldRS.Name, podIndex), + Labels: oldSelector, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + }) + } + for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { + podList.Items = append(podList.Items, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%s-oldUnhealthyPod-%d", oldRS.Name, podIndex), + Labels: oldSelector, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionFalse, + }, + }, + }, + }) + } + for podIndex := 0; podIndex < test.readyPodsFromNewRS; podIndex++ { + podList.Items = append(podList.Items, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%s-newReadyPod-%d", oldRS.Name, podIndex), + Labels: newSelector, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + }) + } + for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { + podList.Items = append(podList.Items, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%s-newUnhealthyPod-%d", oldRS.Name, podIndex), + Labels: newSelector, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionFalse, + }, + }, + }, + }) + } + return true, podList, nil + } + return false, nil, nil + }) + controller := &DeploymentController{ + client: &fakeClientset, + eventRecorder: &record.FakeRecorder{}, + } + + scaled, err := controller.reconcileOldReplicaSets(allRSs, oldRSs, newRS, &deployment) + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + if !test.scaleExpected && scaled { + t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) + } + if test.scaleExpected && !scaled { + t.Errorf("expected scaling to occur") + continue + } + continue + } +} + +func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { + tests := []struct { + oldReplicas int + readyPods int + unHealthyPods int + maxCleanupCount int + cleanupCountExpected int + }{ + { + oldReplicas: 10, + readyPods: 8, + unHealthyPods: 2, + maxCleanupCount: 1, + cleanupCountExpected: 1, + }, + { + oldReplicas: 10, + readyPods: 8, + unHealthyPods: 2, + maxCleanupCount: 3, + cleanupCountExpected: 2, + }, + { + oldReplicas: 10, + readyPods: 8, + unHealthyPods: 2, + maxCleanupCount: 0, + cleanupCountExpected: 0, + }, + { + oldReplicas: 10, + readyPods: 10, + unHealthyPods: 0, + maxCleanupCount: 3, + cleanupCountExpected: 0, + }, + } + + for i, test := range tests { + t.Logf("executing scenario %d", i) + oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) + oldRSs := []*exp.ReplicaSet{oldRS} + deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil) + fakeClientset := fake.Clientset{} + fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + switch action.(type) { + case core.ListAction: + podList := &api.PodList{} + for podIndex := 0; podIndex < test.readyPods; podIndex++ { + podList.Items = append(podList.Items, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%s-readyPod-%d", oldRS.Name, podIndex), + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + }) + } + for podIndex := 0; podIndex < test.unHealthyPods; podIndex++ { + podList.Items = append(podList.Items, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%s-unHealthyPod-%d", oldRS.Name, podIndex), + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionFalse, + }, + }, + }, + }) + } + return true, podList, nil + } + return false, nil, nil + }) + + controller := &DeploymentController{ + client: &fakeClientset, + eventRecorder: &record.FakeRecorder{}, + } + _, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, &deployment, 0, int32(test.maxCleanupCount)) + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + if int(cleanupCount) != test.cleanupCountExpected { + t.Errorf("expected %v unhealthy replicas been cleaned up, got %v", test.cleanupCountExpected, cleanupCount) + continue + } + } +} + +func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing.T) { + tests := []struct { + deploymentReplicas int + maxUnavailable intstr.IntOrString + readyPods int + oldReplicas int + scaleExpected bool + expectedOldReplicas int + }{ + { + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(0), + readyPods: 10, + oldReplicas: 10, + scaleExpected: true, + expectedOldReplicas: 9, + }, + { + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(2), + readyPods: 10, + oldReplicas: 10, + scaleExpected: true, + expectedOldReplicas: 8, + }, + { + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(2), + readyPods: 8, + oldReplicas: 10, + scaleExpected: false, + }, + { + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(2), + readyPods: 10, + oldReplicas: 0, + scaleExpected: false, + }, + { + deploymentReplicas: 10, + maxUnavailable: intstr.FromInt(2), + readyPods: 1, + oldReplicas: 10, + scaleExpected: false, + }, + } + + for i, test := range tests { + t.Logf("executing scenario %d", i) + oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) + allRSs := []*exp.ReplicaSet{oldRS} + oldRSs := []*exp.ReplicaSet{oldRS} + deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"}) + fakeClientset := fake.Clientset{} + fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + switch action.(type) { + case core.ListAction: + podList := &api.PodList{} + for podIndex := 0; podIndex < test.readyPods; podIndex++ { + podList.Items = append(podList.Items, api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%s-pod-%d", oldRS.Name, podIndex), + Labels: map[string]string{"foo": "bar"}, + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + }) + } + return true, podList, nil + } + return false, nil, nil + }) + controller := &DeploymentController{ + client: &fakeClientset, + eventRecorder: &record.FakeRecorder{}, + } + scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(allRSs, oldRSs, &deployment) + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + if !test.scaleExpected { + if scaled != 0 { + t.Errorf("unexpected scaling: %v", fakeClientset.Actions()) + } + continue + } + if test.scaleExpected && scaled == 0 { + t.Errorf("expected scaling to occur; actions: %v", fakeClientset.Actions()) + continue + } + // There are both list and update actions logged, so extract the update + // action for verification. + var updateAction core.UpdateAction + for _, action := range fakeClientset.Actions() { + switch a := action.(type) { + case core.UpdateAction: + if updateAction != nil { + t.Errorf("expected only 1 update action; had %v and found %v", updateAction, a) + } else { + updateAction = a + } + } + } + if updateAction == nil { + t.Errorf("expected an update action") + continue + } + updated := updateAction.GetObject().(*exp.ReplicaSet) + if e, a := test.expectedOldReplicas, int(updated.Spec.Replicas); e != a { + t.Errorf("expected update to %d replicas, got %d", e, a) + } + } +} diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go new file mode 100644 index 00000000000..ff3b37828af --- /dev/null +++ b/pkg/controller/deployment/sync.go @@ -0,0 +1,527 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "reflect" + "sort" + "strconv" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/controller" + deploymentutil "k8s.io/kubernetes/pkg/util/deployment" + utilerrors "k8s.io/kubernetes/pkg/util/errors" + labelsutil "k8s.io/kubernetes/pkg/util/labels" + podutil "k8s.io/kubernetes/pkg/util/pod" + rsutil "k8s.io/kubernetes/pkg/util/replicaset" +) + +// sync is responsible for reconciling deployments on scaling events or when they +// are paused. +func (dc *DeploymentController) sync(deployment *extensions.Deployment) error { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) + if err != nil { + return err + } + if err := dc.scale(deployment, newRS, oldRSs); err != nil { + // If we get an error while trying to scale, the deployment will be requeued + // so we can abort this resync + return err + } + dc.cleanupDeployment(oldRSs, deployment) + + allRSs := append(oldRSs, newRS) + return dc.syncDeploymentStatus(allRSs, newRS, deployment) +} + +// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated. +// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV). +// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1), +// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop. +// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop. +// Note that currently the deployment controller is using caches to avoid querying the server for reads. +// This may lead to stale reads of replica sets, thus incorrect deployment status. +func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { + // List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods + rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment) + if err != nil { + return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err) + } + _, allOldRSs, err := deploymentutil.FindOldReplicaSets(deployment, rsList, podList) + if err != nil { + return nil, nil, err + } + + // Calculate the max revision number among all old RSes + maxOldV := maxRevision(allOldRSs) + + // Get new replica set with the updated revision number + newRS, err := dc.getNewReplicaSet(deployment, rsList, maxOldV, allOldRSs, createIfNotExisted) + if err != nil { + return nil, nil, err + } + + // Sync deployment's revision number with new replica set + if newRS != nil && newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 && + (deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) { + if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil { + glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err) + } + } + + return newRS, allOldRSs, nil +} + +// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced. +func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) { + rsList, err := deploymentutil.ListReplicaSets(deployment, + func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { + return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) + }) + if err != nil { + return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err) + } + syncedRSList := []extensions.ReplicaSet{} + for _, rs := range rsList { + // Add pod-template-hash information if it's not in the RS. + // Otherwise, new RS produced by Deployment will overlap with pre-existing ones + // that aren't constrained by the pod-template-hash. + syncedRS, err := dc.addHashKeyToRSAndPods(rs) + if err != nil { + return nil, nil, err + } + syncedRSList = append(syncedRSList, *syncedRS) + } + syncedPodList, err := dc.listPods(deployment) + if err != nil { + return nil, nil, err + } + return syncedRSList, syncedPodList, nil +} + +// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: +// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created +// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas +// 3. Add hash label to the rs's label and selector +func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) { + updatedRS = &rs + // If the rs already has the new hash label in its selector, it's done syncing + if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { + return + } + namespace := rs.Namespace + hash := rsutil.GetPodTemplateSpecHash(rs) + rsUpdated := false + // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. + updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its pod template label. + if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return utilerrors.ErrPreconditionViolated + } + updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil + }) + if err != nil { + return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) + } + if !rsUpdated { + // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. + // Return here and retry in the next sync loop. + return &rs, nil + } + // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). + if updatedRS.Generation > updatedRS.Status.ObservedGeneration { + if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { + return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) + } + } + glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) + + // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. + selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err) + } + options := api.ListOptions{LabelSelector: selector} + podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + if err != nil { + return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err) + } + allPodsLabeled := false + if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil { + return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err) + } + // If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error. + // Return here and retry in the next sync loop. + if !allPodsLabeled { + return updatedRS, nil + } + + // We need to wait for the replicaset controller to observe the pods being + // labeled with pod template hash. Because previously we've called + // WaitForReplicaSetUpdated, the replicaset controller should have dropped + // FullyLabeledReplicas to 0 already, we only need to wait it to increase + // back to the number of replicas in the spec. + if err = deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil { + return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) + } + + // 3. Update rs label and selector to include the new hash label + // Copy the old selector, so that we can scrub out any orphaned pods + if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS, + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its label or selector. + if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return utilerrors.ErrPreconditionViolated + } + updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil + }); err != nil { + return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) + } + if rsUpdated { + glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) + } + // If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet. + + // TODO: look for orphaned pods and label them in the background somewhere else periodically + + return updatedRS, nil +} + +func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) { + return deploymentutil.ListPods(deployment, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + return &podList, err + }) +} + +// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet. +// 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's). +// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes. +// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. +// Note that the pod-template-hash will be added to adopted RSes and pods. +func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) { + // Calculate revision number for this new replica set + newRevision := strconv.FormatInt(maxOldRevision+1, 10) + + existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList) + if err != nil { + return nil, err + } else if existingNewRS != nil { + // Set existing new replica set's annotation + if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) { + return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS) + } + return existingNewRS, nil + } + + if !createIfNotExisted { + return nil, nil + } + + // new ReplicaSet does not exist, create one. + namespace := deployment.ObjectMeta.Namespace + podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template) + newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment) + // Add podTemplateHash label to selector. + newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) + + // Create new ReplicaSet + newRS := extensions.ReplicaSet{ + ObjectMeta: api.ObjectMeta{ + // Make the name deterministic, to ensure idempotence + Name: deployment.Name + "-" + fmt.Sprintf("%d", podTemplateSpecHash), + Namespace: namespace, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: newRSSelector, + Template: newRSTemplate, + }, + } + allRSs := append(oldRSs, &newRS) + newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS) + if err != nil { + return nil, err + } + + newRS.Spec.Replicas = newReplicasCount + // Set new replica set's annotation + setNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) + createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) + if err != nil { + dc.enqueueDeployment(deployment) + return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err) + } + if newReplicasCount > 0 { + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount) + } + + return createdRS, dc.updateDeploymentRevision(deployment, newRevision) +} + +func (dc *DeploymentController) updateDeploymentRevision(deployment *extensions.Deployment, revision string) error { + if deployment.Annotations == nil { + deployment.Annotations = make(map[string]string) + } + if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision { + deployment.Annotations[deploymentutil.RevisionAnnotation] = revision + _, err := dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) + return err + } + return nil +} + +// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size +// of the new replica set and scaling down can decrease the sizes of the old ones, both of which would +// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable +// replicas in the event of a problem with the rolled out template. Should run only on scaling events or +// when a deployment is paused and not during the normal rollout process. +func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error { + // If there is only one active replica set then we should scale that up to the full count of the + // deployment. If there is no active replica set, then we should scale up the newest replica set. + if activeOrLatest := findActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { + if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas { + return nil + } + _, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, deployment.Spec.Replicas, deployment) + return err + } + + // If the new replica set is saturated, old replica sets should be fully scaled down. + // This case handles replica set adoption during a saturated new replica set. + if deploymentutil.IsSaturated(deployment, newRS) { + for _, old := range controller.FilterActiveReplicaSets(oldRSs) { + if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil { + return err + } + } + return nil + } + + // There are old replica sets with pods and the new replica set is not saturated. + // We need to proportionally scale all replica sets (new and old) in case of a + // rolling deployment. + if deploymentutil.IsRollingUpdate(deployment) { + allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) + allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) + + allowedSize := int32(0) + if deployment.Spec.Replicas > 0 { + allowedSize = deployment.Spec.Replicas + maxSurge(*deployment) + } + + // Number of additional replicas that can be either added or removed from the total + // replicas count. These replicas should be distributed proportionally to the active + // replica sets. + deploymentReplicasToAdd := allowedSize - allRSsReplicas + + // The additional replicas should be distributed proportionally amongst the active + // replica sets from the larger to the smaller in size replica set. Scaling direction + // drives what happens in case we are trying to scale replica sets of the same size. + // In such a case when scaling up, we should scale up newer replica sets first, and + // when scaling down, we should scale down older replica sets first. + scalingOperation := "up" + switch { + case deploymentReplicasToAdd > 0: + sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs)) + + case deploymentReplicasToAdd < 0: + sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs)) + scalingOperation = "down" + + default: /* deploymentReplicasToAdd == 0 */ + // Nothing to add. + return nil + } + + // Iterate over all active replica sets and estimate proportions for each of them. + // The absolute value of deploymentReplicasAdded should never exceed the absolute + // value of deploymentReplicasToAdd. + deploymentReplicasAdded := int32(0) + for i := range allRSs { + rs := allRSs[i] + + proportion := getProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) + + rs.Spec.Replicas += proportion + deploymentReplicasAdded += proportion + } + + // Update all replica sets + for i := range allRSs { + rs := allRSs[i] + + // Add/remove any leftovers to the largest replica set. + if i == 0 { + leftover := deploymentReplicasToAdd - deploymentReplicasAdded + rs.Spec.Replicas += leftover + if rs.Spec.Replicas < 0 { + rs.Spec.Replicas = 0 + } + } + + if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil { + // Return as soon as we fail, the deployment is requeued + return err + } + } + } + return nil +} + +func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) { + // No need to scale + if rs.Spec.Replicas == newScale { + return false, rs, nil + } + var scalingOperation string + if rs.Spec.Replicas < newScale { + scalingOperation = "up" + } else { + scalingOperation = "down" + } + newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation) + return true, newRS, err +} + +func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) { + // NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea. + rs.Spec.Replicas = newScale + setReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) + rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) + if err == nil { + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) + } else { + glog.Warningf("Cannot update replica set %q: %v", rs.Name, err) + dc.enqueueDeployment(deployment) + } + return rs, err +} + +// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets +// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept +// around by default 1) for historical reasons and 2) for the ability to rollback a deployment. +func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error { + if deployment.Spec.RevisionHistoryLimit == nil { + return nil + } + diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit + if diff <= 0 { + return nil + } + + sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) + + var errList []error + // TODO: This should be parallelized. + for i := int32(0); i < diff; i++ { + rs := oldRSs[i] + // Avoid delete replica set with non-zero replica counts + if rs.Status.Replicas != 0 || rs.Spec.Replicas != 0 || rs.Generation > rs.Status.ObservedGeneration { + continue + } + if err := dc.client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, nil); err != nil && !errors.IsNotFound(err) { + glog.V(2).Infof("Failed deleting old replica set %v for deployment %v: %v", rs.Name, deployment.Name, err) + errList = append(errList, err) + } + } + + return utilerrors.NewAggregate(errList) +} + +// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary +func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { + newStatus, err := dc.calculateStatus(allRSs, newRS, d) + if err != nil { + return err + } + if !reflect.DeepEqual(d.Status, newStatus) { + return dc.updateDeploymentStatus(allRSs, newRS, d) + } + return nil +} + +func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) { + availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs) + if err != nil { + return deployment.Status, fmt.Errorf("failed to count available pods: %v", err) + } + totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) + + return extensions.DeploymentStatus{ + // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. + ObservedGeneration: deployment.Generation, + Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs), + UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}), + AvailableReplicas: availableReplicas, + UnavailableReplicas: totalReplicas - availableReplicas, + }, nil +} + +func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) { + podList, err := dc.listPods(deployment) + if err != nil { + return 0, err + } + return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds) +} + +func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error { + newStatus, err := dc.calculateStatus(allRSs, newRS, deployment) + if err != nil { + return err + } + newDeployment := deployment + newDeployment.Status = newStatus + _, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment) + return err +} + +// isScalingEvent checks whether the provided deployment has been updated with a scaling event +// by looking at the desired-replicas annotation in the active replica sets of the deployment. +func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) bool { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) + if err != nil { + return false + } + // If there is no new replica set matching this deployment and the deployment isn't paused + // then there is a new rollout that waits to happen + if newRS == nil && !d.Spec.Paused { + return false + } + allRSs := append(oldRSs, newRS) + for _, rs := range controller.FilterActiveReplicaSets(allRSs) { + desired, ok := getDesiredReplicasAnnotation(rs) + if !ok { + continue + } + if desired != d.Spec.Replicas { + return true + } + } + return false +} diff --git a/pkg/controller/deployment/sync_test.go b/pkg/controller/deployment/sync_test.go new file mode 100644 index 00000000000..d08cf3d0777 --- /dev/null +++ b/pkg/controller/deployment/sync_test.go @@ -0,0 +1,348 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/api/unversioned" + exp "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/util/intstr" +) + +func TestScale(t *testing.T) { + newTimestamp := unversioned.Date(2016, 5, 20, 2, 0, 0, 0, time.UTC) + oldTimestamp := unversioned.Date(2016, 5, 20, 1, 0, 0, 0, time.UTC) + olderTimestamp := unversioned.Date(2016, 5, 20, 0, 0, 0, 0, time.UTC) + + tests := []struct { + name string + deployment *exp.Deployment + oldDeployment *exp.Deployment + + newRS *exp.ReplicaSet + oldRSs []*exp.ReplicaSet + + expectedNew *exp.ReplicaSet + expectedOld []*exp.ReplicaSet + + desiredReplicasAnnotations map[string]int32 + }{ + { + name: "normal scaling event: 10 -> 12", + deployment: newDeployment(12, nil), + oldDeployment: newDeployment(10, nil), + + newRS: rs("foo-v1", 10, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{}, + + expectedNew: rs("foo-v1", 12, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{}, + }, + { + name: "normal scaling event: 10 -> 5", + deployment: newDeployment(5, nil), + oldDeployment: newDeployment(10, nil), + + newRS: rs("foo-v1", 10, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{}, + + expectedNew: rs("foo-v1", 5, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{}, + }, + { + name: "proportional scaling: 5 -> 10", + deployment: newDeployment(10, nil), + oldDeployment: newDeployment(5, nil), + + newRS: rs("foo-v2", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 4, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, + }, + { + name: "proportional scaling: 5 -> 3", + deployment: newDeployment(3, nil), + oldDeployment: newDeployment(5, nil), + + newRS: rs("foo-v2", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 1, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 2, nil, oldTimestamp)}, + }, + { + name: "proportional scaling: 9 -> 4", + deployment: newDeployment(4, nil), + oldDeployment: newDeployment(9, nil), + + newRS: rs("foo-v2", 8, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 1, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 4, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 0, nil, oldTimestamp)}, + }, + { + name: "proportional scaling: 7 -> 10", + deployment: newDeployment(10, nil), + oldDeployment: newDeployment(7, nil), + + newRS: rs("foo-v3", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 3, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 4, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, + }, + { + name: "proportional scaling: 13 -> 8", + deployment: newDeployment(8, nil), + oldDeployment: newDeployment(13, nil), + + newRS: rs("foo-v3", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 8, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 1, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 5, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, + }, + // Scales up the new replica set. + { + name: "leftover distribution: 3 -> 4", + deployment: newDeployment(4, nil), + oldDeployment: newDeployment(3, nil), + + newRS: rs("foo-v3", 1, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 2, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + }, + // Scales down the older replica set. + { + name: "leftover distribution: 3 -> 2", + deployment: newDeployment(2, nil), + oldDeployment: newDeployment(3, nil), + + newRS: rs("foo-v3", 1, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 1, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + }, + // Scales up the latest replica set first. + { + name: "proportional scaling (no new rs): 4 -> 5", + deployment: newDeployment(5, nil), + oldDeployment: newDeployment(4, nil), + + newRS: nil, + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, + + expectedNew: nil, + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, + }, + // Scales down to zero + { + name: "proportional scaling: 6 -> 0", + deployment: newDeployment(0, nil), + oldDeployment: newDeployment(6, nil), + + newRS: rs("foo-v3", 3, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 0, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + }, + // Scales up from zero + { + name: "proportional scaling: 0 -> 6", + deployment: newDeployment(6, nil), + oldDeployment: newDeployment(0, nil), + + newRS: rs("foo-v3", 0, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 6, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + }, + // Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 ) + // Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to + // update. + { + name: "failed rs update", + deployment: newDeployment(5, nil), + oldDeployment: newDeployment(5, nil), + + newRS: rs("foo-v3", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 2, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)}, + }, + { + name: "deployment with surge pods", + deployment: newDeploymentEnhanced(20, intstr.FromInt(2)), + oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(2)), + + newRS: rs("foo-v2", 6, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 11, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 11, nil, oldTimestamp)}, + }, + { + name: "change both surge and size", + deployment: newDeploymentEnhanced(50, intstr.FromInt(6)), + oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(3)), + + newRS: rs("foo-v2", 5, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 8, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 22, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 34, nil, oldTimestamp)}, + }, + } + + for _, test := range tests { + _ = olderTimestamp + t.Log(test.name) + fake := fake.Clientset{} + dc := &DeploymentController{ + client: &fake, + eventRecorder: &record.FakeRecorder{}, + } + + if test.newRS != nil { + desiredReplicas := test.oldDeployment.Spec.Replicas + if desired, ok := test.desiredReplicasAnnotations[test.newRS.Name]; ok { + desiredReplicas = desired + } + setReplicasAnnotations(test.newRS, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) + } + for i := range test.oldRSs { + rs := test.oldRSs[i] + if rs == nil { + continue + } + desiredReplicas := test.oldDeployment.Spec.Replicas + if desired, ok := test.desiredReplicasAnnotations[rs.Name]; ok { + desiredReplicas = desired + } + setReplicasAnnotations(rs, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) + } + + if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil { + t.Errorf("%s: unexpected error: %v", test.name, err) + continue + } + if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != test.newRS.Spec.Replicas { + t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, test.newRS.Spec.Replicas) + continue + } + if len(test.expectedOld) != len(test.oldRSs) { + t.Errorf("%s: expected %d old replica sets, got %d", test.name, len(test.expectedOld), len(test.oldRSs)) + continue + } + for n := range test.oldRSs { + rs := test.oldRSs[n] + exp := test.expectedOld[n] + if exp.Spec.Replicas != rs.Spec.Replicas { + t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, exp.Spec.Replicas, rs.Spec.Replicas) + } + } + } +} + +func TestDeploymentController_cleanupDeployment(t *testing.T) { + selector := map[string]string{"foo": "bar"} + + tests := []struct { + oldRSs []*exp.ReplicaSet + revisionHistoryLimit int + expectedDeletions int + }{ + { + oldRSs: []*exp.ReplicaSet{ + newRSWithStatus("foo-1", 0, 0, selector), + newRSWithStatus("foo-2", 0, 0, selector), + newRSWithStatus("foo-3", 0, 0, selector), + }, + revisionHistoryLimit: 1, + expectedDeletions: 2, + }, + { + // Only delete the replica set with Spec.Replicas = Status.Replicas = 0. + oldRSs: []*exp.ReplicaSet{ + newRSWithStatus("foo-1", 0, 0, selector), + newRSWithStatus("foo-2", 0, 1, selector), + newRSWithStatus("foo-3", 1, 0, selector), + newRSWithStatus("foo-4", 1, 1, selector), + }, + revisionHistoryLimit: 0, + expectedDeletions: 1, + }, + + { + oldRSs: []*exp.ReplicaSet{ + newRSWithStatus("foo-1", 0, 0, selector), + newRSWithStatus("foo-2", 0, 0, selector), + }, + revisionHistoryLimit: 0, + expectedDeletions: 2, + }, + { + oldRSs: []*exp.ReplicaSet{ + newRSWithStatus("foo-1", 1, 1, selector), + newRSWithStatus("foo-2", 1, 1, selector), + }, + revisionHistoryLimit: 0, + expectedDeletions: 0, + }, + } + + for i, test := range tests { + fake := &fake.Clientset{} + controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) + + controller.eventRecorder = &record.FakeRecorder{} + controller.rsStoreSynced = alwaysReady + controller.podStoreSynced = alwaysReady + for _, rs := range test.oldRSs { + controller.rsStore.Add(rs) + } + + d := newDeployment(1, &tests[i].revisionHistoryLimit) + controller.cleanupDeployment(test.oldRSs, d) + + gotDeletions := 0 + for _, action := range fake.Actions() { + if "delete" == action.GetVerb() { + gotDeletions++ + } + } + if gotDeletions != test.expectedDeletions { + t.Errorf("expect %v old replica sets been deleted, but got %v", test.expectedDeletions, gotDeletions) + continue + } + } +} diff --git a/pkg/controller/deployment/util.go b/pkg/controller/deployment/util.go index 06fbad3c874..4070e821790 100644 --- a/pkg/controller/deployment/util.go +++ b/pkg/controller/deployment/util.go @@ -23,12 +23,123 @@ import ( "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api/annotations" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/controller" deploymentutil "k8s.io/kubernetes/pkg/util/deployment" "k8s.io/kubernetes/pkg/util/integer" ) +func maxRevision(allRSs []*extensions.ReplicaSet) int64 { + max := int64(0) + for _, rs := range allRSs { + if v, err := deploymentutil.Revision(rs); err != nil { + // Skip the replica sets when it failed to parse their revision information + glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) + } else if v > max { + max = v + } + } + return max +} + +// lastRevision finds the second max revision number in all replica sets (the last revision) +func lastRevision(allRSs []*extensions.ReplicaSet) int64 { + max, secMax := int64(0), int64(0) + for _, rs := range allRSs { + if v, err := deploymentutil.Revision(rs); err != nil { + // Skip the replica sets when it failed to parse their revision information + glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) + } else if v >= max { + secMax = max + max = v + } else if v > secMax { + secMax = v + } + } + return secMax +} + +// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and +// copying required deployment annotations to it; it returns true if replica set's annotation is changed. +func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool { + // First, copy deployment's annotations (except for apply and revision annotations) + annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) + // Then, update replica set's revision annotation + if newRS.Annotations == nil { + newRS.Annotations = make(map[string]string) + } + // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number + // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and + // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision. + if newRS.Annotations[deploymentutil.RevisionAnnotation] < newRevision { + newRS.Annotations[deploymentutil.RevisionAnnotation] = newRevision + annotationChanged = true + glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) + } + if !exists && setReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) { + annotationChanged = true + } + return annotationChanged +} + +var annotationsToSkip = map[string]bool{ + annotations.LastAppliedConfigAnnotation: true, + deploymentutil.RevisionAnnotation: true, + deploymentutil.DesiredReplicasAnnotation: true, + deploymentutil.MaxReplicasAnnotation: true, +} + +// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key +// TODO: How to decide which annotations should / should not be copied? +// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 +func skipCopyAnnotation(key string) bool { + return annotationsToSkip[key] +} + +// copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations, +// and returns true if replica set's annotation is changed. +// Note that apply and revision annotations are not copied. +func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool { + rsAnnotationsChanged := false + if rs.Annotations == nil { + rs.Annotations = make(map[string]string) + } + for k, v := range deployment.Annotations { + // newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated + // by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of + // deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable. + if skipCopyAnnotation(k) || rs.Annotations[k] == v { + continue + } + rs.Annotations[k] = v + rsAnnotationsChanged = true + } + return rsAnnotationsChanged +} + +// setDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations. +// This action should be done if and only if the deployment is rolling back to this rs. +// Note that apply and revision annotations are not changed. +func setDeploymentAnnotationsTo(deployment *extensions.Deployment, rollbackToRS *extensions.ReplicaSet) { + deployment.Annotations = getSkippedAnnotations(deployment.Annotations) + for k, v := range rollbackToRS.Annotations { + if !skipCopyAnnotation(k) { + deployment.Annotations[k] = v + } + } +} + +func getSkippedAnnotations(annotations map[string]string) map[string]string { + skippedAnnotations := make(map[string]string) + for k, v := range annotations { + if skipCopyAnnotation(k) { + skippedAnnotations[k] = v + } + } + return skippedAnnotations +} + // findActiveOrLatest returns the only active or the latest replica set in case there is at most one active // replica set. If there are more active replica sets, then we should proportionally scale them. func findActiveOrLatest(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) *extensions.ReplicaSet {