Merge pull request #20273 from kargakis/allow-scaling-paused-deployments

Automatic merge from submit-queue

Proportionally scale paused and rolling deployments

Enable paused and rolling deployments to be proportionally scaled.
Also have cleanup policy work for paused deployments.

Fixes #20853
Fixes #20966
Fixes #20754

@bgrant0607 @janetkuo @ironcladlou @nikhiljindal

<!-- Reviewable:start -->
---
This change is [<img src="http://reviewable.k8s.io/review_button.svg" height="35" align="absmiddle" alt="Reviewable"/>](http://reviewable.k8s.io/reviews/kubernetes/kubernetes/20273)
<!-- Reviewable:end -->
This commit is contained in:
k8s-merge-robot 2016-06-24 19:41:51 -07:00 committed by GitHub
commit fc1937f68f
13 changed files with 921 additions and 129 deletions

View File

@ -1772,35 +1772,35 @@ __EOF__
# Command
# Create a deployment (revision 1)
kubectl create -f hack/testdata/deployment-revision1.yaml "${kube_flags[@]}"
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx-deployment:'
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx:'
kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R1}:"
# Rollback to revision 1 - should be no-op
kubectl rollout undo deployment nginx-deployment --to-revision=1 "${kube_flags[@]}"
kubectl rollout undo deployment nginx --to-revision=1 "${kube_flags[@]}"
kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R1}:"
# Update the deployment (revision 2)
kubectl apply -f hack/testdata/deployment-revision2.yaml "${kube_flags[@]}"
kube::test::get_object_assert deployment.extensions "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R2}:"
# Rollback to revision 1
kubectl rollout undo deployment nginx-deployment --to-revision=1 "${kube_flags[@]}"
kubectl rollout undo deployment nginx --to-revision=1 "${kube_flags[@]}"
sleep 1
kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R1}:"
# Rollback to revision 1000000 - should be no-op
kubectl rollout undo deployment nginx-deployment --to-revision=1000000 "${kube_flags[@]}"
kubectl rollout undo deployment nginx --to-revision=1000000 "${kube_flags[@]}"
kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R1}:"
# Rollback to last revision
kubectl rollout undo deployment nginx-deployment "${kube_flags[@]}"
kubectl rollout undo deployment nginx "${kube_flags[@]}"
sleep 1
kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R2}:"
# Pause the deployment
kubectl-with-retry rollout pause deployment nginx-deployment "${kube_flags[@]}"
kubectl-with-retry rollout pause deployment nginx "${kube_flags[@]}"
# A paused deployment cannot be rolled back
! kubectl rollout undo deployment nginx-deployment "${kube_flags[@]}"
! kubectl rollout undo deployment nginx "${kube_flags[@]}"
# Resume the deployment
kubectl-with-retry rollout resume deployment nginx-deployment "${kube_flags[@]}"
kubectl-with-retry rollout resume deployment nginx "${kube_flags[@]}"
# The resumed deployment can now be rolled back
kubectl rollout undo deployment nginx-deployment "${kube_flags[@]}"
kubectl rollout undo deployment nginx "${kube_flags[@]}"
# Clean up
kubectl delete deployment nginx-deployment "${kube_flags[@]}"
kubectl delete deployment nginx "${kube_flags[@]}"
### Set image of a deployment
# Pre-condition: no deployment exists

View File

@ -1,18 +1,18 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: nginx-deployment
name: nginx
labels:
name: nginx-deployment
name: nginx-undo
spec:
replicas: 3
selector:
matchLabels:
name: nginx
name: nginx-undo
template:
metadata:
labels:
name: nginx
name: nginx-undo
spec:
containers:
- name: nginx

View File

@ -1,18 +1,18 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: nginx-deployment
name: nginx
labels:
name: nginx-deployment
name: nginx-undo
spec:
replicas: 3
selector:
matchLabels:
name: nginx
name: nginx-undo
template:
metadata:
labels:
name: nginx
name: nginx-undo
spec:
containers:
- name: nginx

View File

@ -147,12 +147,15 @@ var ValidateDeploymentName = apivalidation.NameIsDNSSubdomain
func ValidatePositiveIntOrPercent(intOrPercent intstr.IntOrString, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if intOrPercent.Type == intstr.String {
switch intOrPercent.Type {
case intstr.String:
if !validation.IsValidPercent(intOrPercent.StrVal) {
allErrs = append(allErrs, field.Invalid(fldPath, intOrPercent, "must be an integer or percentage (e.g '5%')"))
allErrs = append(allErrs, field.Invalid(fldPath, intOrPercent, "must be an integer or percentage (e.g '5%%')"))
}
} else if intOrPercent.Type == intstr.Int {
case intstr.Int:
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(intOrPercent.IntValue()), fldPath)...)
default:
allErrs = append(allErrs, field.Invalid(fldPath, intOrPercent, "must be an integer or percentage (e.g '5%%')"))
}
return allErrs
}

View File

@ -619,7 +619,9 @@ func IsPodActive(p api.Pod) bool {
func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions.ReplicaSet {
active := []*extensions.ReplicaSet{}
for i := range replicaSets {
if replicaSets[i].Spec.Replicas > 0 {
rs := replicaSets[i]
if rs != nil && rs.Spec.Replicas > 0 {
active = append(active, replicaSets[i])
}
}
@ -639,7 +641,6 @@ type ControllersByCreationTimestamp []*api.ReplicationController
func (o ControllersByCreationTimestamp) Len() int { return len(o) }
func (o ControllersByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o ControllersByCreationTimestamp) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
@ -647,15 +648,40 @@ func (o ControllersByCreationTimestamp) Less(i, j int) bool {
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}
// ReplicaSetsByCreationTimestamp sorts a list of ReplicationSets by creation timestamp, using their names as a tie breaker.
// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker.
type ReplicaSetsByCreationTimestamp []*extensions.ReplicaSet
func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) }
func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}
// ReplicaSetsBySizeOlder sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker.
// By using the creation timestamp, this sorts from old to new replica sets.
type ReplicaSetsBySizeOlder []*extensions.ReplicaSet
func (o ReplicaSetsBySizeOlder) Len() int { return len(o) }
func (o ReplicaSetsBySizeOlder) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o ReplicaSetsBySizeOlder) Less(i, j int) bool {
if o[i].Spec.Replicas == o[j].Spec.Replicas {
return ReplicaSetsByCreationTimestamp(o).Less(i, j)
}
return o[i].Spec.Replicas > o[j].Spec.Replicas
}
// ReplicaSetsBySizeNewer sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker.
// By using the creation timestamp, this sorts from new to old replica sets.
type ReplicaSetsBySizeNewer []*extensions.ReplicaSet
func (o ReplicaSetsBySizeNewer) Len() int { return len(o) }
func (o ReplicaSetsBySizeNewer) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o ReplicaSetsBySizeNewer) Less(i, j int) bool {
if o[i].Spec.Replicas == o[j].Spec.Replicas {
return ReplicaSetsByCreationTimestamp(o).Less(j, i)
}
return o[i].Spec.Replicas > o[j].Spec.Replicas
}

View File

@ -438,13 +438,9 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}
if d.Spec.Paused {
// TODO: Implement scaling for paused deployments.
// Don't take any action for paused deployment.
// But keep the status up-to-date.
// Ignore paused deployments
glog.V(4).Infof("Updating status only for paused deployment %s/%s", d.Namespace, d.Name)
return dc.syncPausedDeploymentStatus(d)
return dc.sync(d)
}
if d.Spec.RollbackTo != nil {
revision := d.Spec.RollbackTo.Revision
if _, err = dc.rollback(d, &revision); err != nil {
@ -452,27 +448,135 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}
}
if dc.isScalingEvent(d) {
return dc.sync(d)
}
switch d.Spec.Strategy.Type {
case extensions.RecreateDeploymentStrategyType:
return dc.syncRecreateDeployment(d)
return dc.rolloutRecreate(d)
case extensions.RollingUpdateDeploymentStrategyType:
return dc.syncRollingUpdateDeployment(d)
return dc.rolloutRolling(d)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
// Updates the status of a paused deployment
func (dc *DeploymentController) syncPausedDeploymentStatus(deployment *extensions.Deployment) error {
// sync is responsible for reconciling deployments on scaling events or when they
// are paused.
func (dc *DeploymentController) sync(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
if err != nil {
return err
}
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)
if err := dc.scale(deployment, newRS, oldRSs); err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}
dc.cleanupDeployment(oldRSs, deployment)
// Sync deployment status
allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
}
// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
// of the new replica set and scaling down can decrease the sizes of the old ones, both of which would
// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
// replicas in the event of a problem with the rolled out template. Should run only on scaling events or
// when a deployment is paused and not during the normal rollout process.
func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error {
// If there is only one active replica set then we should scale that up to the full count of the
// deployment. If there is no active replica set, then we should scale up the newest replica set.
if activeOrLatest := findActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas {
return nil
}
_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, deployment.Spec.Replicas, deployment)
return err
}
// If the new replica set is saturated, old replica sets should be fully scaled down.
// This case handles replica set adoption during a saturated new replica set.
if deploymentutil.IsSaturated(deployment, newRS) {
for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
return err
}
}
return nil
}
// There are old replica sets with pods and the new replica set is not saturated.
// We need to proportionally scale all replica sets (new and old) in case of a
// rolling deployment.
if deploymentutil.IsRollingUpdate(deployment) {
allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
allowedSize := int32(0)
if deployment.Spec.Replicas > 0 {
allowedSize = deployment.Spec.Replicas + maxSurge(*deployment)
}
// Number of additional replicas that can be either added or removed from the total
// replicas count. These replicas should be distributed proportionally to the active
// replica sets.
deploymentReplicasToAdd := allowedSize - allRSsReplicas
// The additional replicas should be distributed proportionally amongst the active
// replica sets from the larger to the smaller in size replica set. Scaling direction
// drives what happens in case we are trying to scale replica sets of the same size.
// In such a case when scaling up, we should scale up newer replica sets first, and
// when scaling down, we should scale down older replica sets first.
scalingOperation := "up"
switch {
case deploymentReplicasToAdd > 0:
sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
case deploymentReplicasToAdd < 0:
sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
scalingOperation = "down"
default: /* deploymentReplicasToAdd == 0 */
// Nothing to add.
return nil
}
// Iterate over all active replica sets and estimate proportions for each of them.
// The absolute value of deploymentReplicasAdded should never exceed the absolute
// value of deploymentReplicasToAdd.
deploymentReplicasAdded := int32(0)
for i := range allRSs {
rs := allRSs[i]
proportion := getProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
rs.Spec.Replicas += proportion
deploymentReplicasAdded += proportion
}
// Update all replica sets
for i := range allRSs {
rs := allRSs[i]
// Add/remove any leftovers to the largest replica set.
if i == 0 {
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
rs.Spec.Replicas += leftover
if rs.Spec.Replicas < 0 {
rs.Spec.Replicas = 0
}
}
if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil {
// Return as soon as we fail, the deployment is requeued
return err
}
}
}
return nil
}
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true)
@ -526,13 +630,13 @@ func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *e
return dc.updateDeployment(deployment)
}
func (dc *DeploymentController) syncRecreateDeployment(deployment *extensions.Deployment) error {
func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error {
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
if err != nil {
return err
}
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)
allRSs := append(oldRSs, newRS)
// scale down old replica sets
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(controller.FilterActiveReplicaSets(oldRSs), deployment)
@ -564,21 +668,18 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment *extensions.De
return dc.updateDeploymentStatus(allRSs, newRS, deployment)
}
if deployment.Spec.RevisionHistoryLimit != nil {
// Cleanup old replica sets
dc.cleanupOldReplicaSets(oldRSs, deployment)
}
dc.cleanupDeployment(oldRSs, deployment)
// Sync deployment status
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
}
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensions.Deployment) error {
func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true)
if err != nil {
return err
}
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)
allRSs := append(oldRSs, newRS)
// Scale up, if we can.
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment)
@ -600,10 +701,7 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensio
return dc.updateDeploymentStatus(allRSs, newRS, deployment)
}
if deployment.Spec.RevisionHistoryLimit != nil {
// Cleanup old replicas sets
dc.cleanupOldReplicaSets(oldRSs, deployment)
}
dc.cleanupDeployment(oldRSs, deployment)
// Sync deployment status
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
@ -611,11 +709,11 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensio
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error {
totalActualReplicas, updatedReplicas, availableReplicas, _, err := dc.calculateStatus(allRSs, newRS, d)
newStatus, err := dc.calculateStatus(allRSs, newRS, d)
if err != nil {
return err
}
if d.Generation > d.Status.ObservedGeneration || d.Status.Replicas != totalActualReplicas || d.Status.UpdatedReplicas != updatedReplicas || d.Status.AvailableReplicas != availableReplicas {
if !reflect.DeepEqual(d.Status, newStatus) {
return dc.updateDeploymentStatus(allRSs, newRS, d)
}
return nil
@ -626,6 +724,8 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.Replic
// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
// Note that currently the deployment controller is using caches to avoid querying the server for reads.
// This may lead to stale reads of replica sets, thus incorrect deployment status.
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment)
@ -701,7 +801,7 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
return nil, err
} else if existingNewRS != nil {
// Set existing new replica set's annotation
if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision) {
if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) {
return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS)
}
return existingNewRS, nil
@ -731,8 +831,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
Template: newRSTemplate,
},
}
// Set new replica set's annotation
setNewReplicaSetAnnotations(deployment, &newRS, newRevision)
allRSs := append(oldRSs, &newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS)
if err != nil {
@ -740,6 +838,8 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
}
newRS.Spec.Replicas = newReplicasCount
// Set new replica set's annotation
setNewReplicaSetAnnotations(deployment, &newRS, newRevision, false)
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
if err != nil {
dc.enqueueDeployment(deployment)
@ -881,7 +981,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
// copying required deployment annotations to it; it returns true if replica set's annotation is changed.
func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string) bool {
func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool {
// First, copy deployment's annotations (except for apply and revision annotations)
annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS)
// Then, update replica set's revision annotation
@ -894,17 +994,26 @@ func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *exten
if newRS.Annotations[deploymentutil.RevisionAnnotation] < newRevision {
newRS.Annotations[deploymentutil.RevisionAnnotation] = newRevision
annotationChanged = true
glog.V(4).Infof("updating replica set %q's revision to %s - %+v\n", newRS.Name, newRevision, newRS)
glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision)
}
if !exists && setReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) {
annotationChanged = true
}
return annotationChanged
}
var annotationsToSkip = map[string]bool{
annotations.LastAppliedConfigAnnotation: true,
deploymentutil.RevisionAnnotation: true,
deploymentutil.DesiredReplicasAnnotation: true,
deploymentutil.MaxReplicasAnnotation: true,
}
// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key
// TODO: How to decide which annotations should / should not be copied?
// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615
func skipCopyAnnotation(key string) bool {
// Skip apply annotations and revision annotations.
return key == annotations.LastAppliedConfigAnnotation || key == deploymentutil.RevisionAnnotation
return annotationsToSkip[key]
}
func getSkippedAnnotations(annotations map[string]string) map[string]string {
@ -980,12 +1089,12 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
return scaled, err
}
func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) {
podList, err := dc.listPods(deployment)
if err != nil {
return 0, err
}
return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds)
return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds)
}
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
@ -1002,11 +1111,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep
if err != nil {
return false, fmt.Errorf("could not find available pods: %v", err)
}
_, maxUnavailable, err := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
if err != nil {
return false, err
}
maxUnavailable := maxUnavailable(*deployment)
// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
@ -1108,10 +1213,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
// Need check maxUnavailable to ensure availability
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int32, error) {
_, maxUnavailable, err := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
if err != nil {
return 0, err
}
maxUnavailable := maxUnavailable(*deployment)
// Check if we can scale down.
minAvailable := deployment.Spec.Replicas - maxUnavailable
@ -1183,7 +1285,13 @@ func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extension
return scaled, err
}
func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error {
// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error {
if deployment.Spec.RevisionHistoryLimit == nil {
return nil
}
diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit
if diff <= 0 {
return nil
@ -1209,39 +1317,31 @@ func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.Repli
}
func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error {
totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas, err := dc.calculateStatus(allRSs, newRS, deployment)
newStatus, err := dc.calculateStatus(allRSs, newRS, deployment)
if err != nil {
return err
}
newDeployment := *deployment
// TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods.
newDeployment.Status = extensions.DeploymentStatus{
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
ObservedGeneration: deployment.Generation,
Replicas: totalActualReplicas,
UpdatedReplicas: updatedReplicas,
AvailableReplicas: availableReplicas,
UnavailableReplicas: unavailableReplicas,
}
_, err = dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
if err == nil {
glog.V(4).Infof("Updated deployment %s status: %+v", deployment.Name, newDeployment.Status)
}
newDeployment := deployment
newDeployment.Status = newStatus
_, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment)
return err
}
func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas int32, err error) {
totalActualReplicas = deploymentutil.GetActualReplicaCountForReplicaSets(allRSs)
updatedReplicas = deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS})
minReadySeconds := deployment.Spec.MinReadySeconds
availableReplicas, err = dc.getAvailablePodsForReplicaSets(deployment, allRSs, minReadySeconds)
func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) {
availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs)
if err != nil {
err = fmt.Errorf("failed to count available pods: %v", err)
return
return deployment.Status, fmt.Errorf("failed to count available pods: %v", err)
}
totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
unavailableReplicas = totalReplicas - availableReplicas
return
return extensions.DeploymentStatus{
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
ObservedGeneration: deployment.Generation,
Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}),
AvailableReplicas: availableReplicas,
UnavailableReplicas: totalReplicas - availableReplicas,
}, nil
}
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) {
@ -1255,24 +1355,25 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep
} else {
scalingOperation = "down"
}
newRS, err := dc.scaleReplicaSet(rs, newScale)
if err == nil {
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} else {
dc.enqueueDeployment(deployment)
}
newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
return true, newRS, err
}
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32) (*extensions.ReplicaSet, error) {
// TODO: Using client for now, update to use store when it is ready.
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) {
// NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea.
rs.Spec.Replicas = newScale
return dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
setReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment))
rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
if err == nil {
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} else {
glog.Warningf("Cannot update replica set %q: %v", rs.Name, err)
dc.enqueueDeployment(deployment)
}
return rs, err
}
func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
// TODO: Using client for now, update to use store when it is ready.
return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
}
@ -1300,3 +1401,28 @@ func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deploy
d, err = dc.updateDeploymentAndClearRollbackTo(deployment)
return
}
// isScalingEvent checks whether the provided deployment has been updated with a scaling event
// by looking at the desired-replicas annotation in the active replica sets of the deployment.
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) bool {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
if err != nil {
return false
}
// If there is no new replica set matching this deployment and the deployment isn't paused
// then there is a new rollout that waits to happen
if newRS == nil && !d.Spec.Paused {
return false
}
allRSs := append(oldRSs, newRS)
for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
desired, ok := getDesiredReplicasAnnotation(rs)
if !ok {
continue
}
if desired != d.Spec.Replicas {
return true
}
}
return false
}

View File

@ -19,6 +19,7 @@ package deployment
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
@ -33,10 +34,16 @@ import (
"k8s.io/kubernetes/pkg/util/intstr"
)
func rs(name string, replicas int, selector map[string]string) *exp.ReplicaSet {
var (
alwaysReady = func() bool { return true }
noTimestamp = unversioned.Time{}
)
func rs(name string, replicas int, selector map[string]string, timestamp unversioned.Time) *exp.ReplicaSet {
return &exp.ReplicaSet{
ObjectMeta: api.ObjectMeta{
Name: name,
Name: name,
CreationTimestamp: timestamp,
},
Spec: exp.ReplicaSetSpec{
Replicas: int32(replicas),
@ -47,7 +54,7 @@ func rs(name string, replicas int, selector map[string]string) *exp.ReplicaSet {
}
func newRSWithStatus(name string, specReplicas, statusReplicas int, selector map[string]string) *exp.ReplicaSet {
rs := rs(name, specReplicas, selector)
rs := rs(name, specReplicas, selector, noTimestamp)
rs.Status = exp.ReplicaSetStatus{
Replicas: int32(statusReplicas),
}
@ -73,8 +80,6 @@ func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOr
}
}
var alwaysReady = func() bool { return true }
func newDeployment(replicas int, revisionHistoryLimit *int) *exp.Deployment {
var v *int32
if revisionHistoryLimit != nil {
@ -117,6 +122,13 @@ func newDeployment(replicas int, revisionHistoryLimit *int) *exp.Deployment {
return &d
}
// TODO: Consolidate all deployment helpers into one.
func newDeploymentEnhanced(replicas int, maxSurge intstr.IntOrString) *exp.Deployment {
d := newDeployment(replicas, nil)
d.Spec.Strategy.RollingUpdate.MaxSurge = maxSurge
return d
}
func newReplicaSet(d *exp.Deployment, name string, replicas int) *exp.ReplicaSet {
return &exp.ReplicaSet{
ObjectMeta: api.ObjectMeta{
@ -128,13 +140,262 @@ func newReplicaSet(d *exp.Deployment, name string, replicas int) *exp.ReplicaSet
Template: d.Spec.Template,
},
}
}
func newListOptions() api.ListOptions {
return api.ListOptions{}
}
// TestScale tests proportional scaling of deployments. Note that fenceposts for
// rolling out (maxUnavailable, maxSurge) have no meaning for simple scaling other
// than recording maxSurge as part of the max-replicas annotation that is taken
// into account in the next scale event (max-replicas is used for calculating the
// proportion of a replica set).
func TestScale(t *testing.T) {
newTimestamp := unversioned.Date(2016, 5, 20, 2, 0, 0, 0, time.UTC)
oldTimestamp := unversioned.Date(2016, 5, 20, 1, 0, 0, 0, time.UTC)
olderTimestamp := unversioned.Date(2016, 5, 20, 0, 0, 0, 0, time.UTC)
tests := []struct {
name string
deployment *exp.Deployment
oldDeployment *exp.Deployment
newRS *exp.ReplicaSet
oldRSs []*exp.ReplicaSet
expectedNew *exp.ReplicaSet
expectedOld []*exp.ReplicaSet
desiredReplicasAnnotations map[string]int32
}{
{
name: "normal scaling event: 10 -> 12",
deployment: newDeployment(12, nil),
oldDeployment: newDeployment(10, nil),
newRS: rs("foo-v1", 10, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{},
expectedNew: rs("foo-v1", 12, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{},
},
{
name: "normal scaling event: 10 -> 5",
deployment: newDeployment(5, nil),
oldDeployment: newDeployment(10, nil),
newRS: rs("foo-v1", 10, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{},
expectedNew: rs("foo-v1", 5, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{},
},
{
name: "proportional scaling: 5 -> 10",
deployment: newDeployment(10, nil),
oldDeployment: newDeployment(5, nil),
newRS: rs("foo-v2", 2, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)},
expectedNew: rs("foo-v2", 4, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)},
},
{
name: "proportional scaling: 5 -> 3",
deployment: newDeployment(3, nil),
oldDeployment: newDeployment(5, nil),
newRS: rs("foo-v2", 2, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)},
expectedNew: rs("foo-v2", 1, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v1", 2, nil, oldTimestamp)},
},
{
name: "proportional scaling: 9 -> 4",
deployment: newDeployment(4, nil),
oldDeployment: newDeployment(9, nil),
newRS: rs("foo-v2", 8, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v1", 1, nil, oldTimestamp)},
expectedNew: rs("foo-v2", 4, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v1", 0, nil, oldTimestamp)},
},
{
name: "proportional scaling: 7 -> 10",
deployment: newDeployment(10, nil),
oldDeployment: newDeployment(7, nil),
newRS: rs("foo-v3", 2, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)},
expectedNew: rs("foo-v3", 3, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v2", 4, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)},
},
{
name: "proportional scaling: 13 -> 8",
deployment: newDeployment(8, nil),
oldDeployment: newDeployment(13, nil),
newRS: rs("foo-v3", 2, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v2", 8, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)},
expectedNew: rs("foo-v3", 1, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v2", 5, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)},
},
// Scales up the new replica set.
{
name: "leftover distribution: 3 -> 4",
deployment: newDeployment(4, nil),
oldDeployment: newDeployment(3, nil),
newRS: rs("foo-v3", 1, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)},
expectedNew: rs("foo-v3", 2, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)},
},
// Scales down the older replica set.
{
name: "leftover distribution: 3 -> 2",
deployment: newDeployment(2, nil),
oldDeployment: newDeployment(3, nil),
newRS: rs("foo-v3", 1, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)},
expectedNew: rs("foo-v3", 1, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)},
},
// Scales up the latest replica set first.
{
name: "proportional scaling (no new rs): 4 -> 5",
deployment: newDeployment(5, nil),
oldDeployment: newDeployment(4, nil),
newRS: nil,
oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)},
expectedNew: nil,
expectedOld: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)},
},
// Scales down to zero
{
name: "proportional scaling: 6 -> 0",
deployment: newDeployment(0, nil),
oldDeployment: newDeployment(6, nil),
newRS: rs("foo-v3", 3, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)},
expectedNew: rs("foo-v3", 0, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)},
},
// Scales up from zero
{
name: "proportional scaling: 0 -> 6",
deployment: newDeployment(6, nil),
oldDeployment: newDeployment(0, nil),
newRS: rs("foo-v3", 0, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)},
expectedNew: rs("foo-v3", 6, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)},
},
// Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 )
// Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to
// update.
{
name: "failed rs update",
deployment: newDeployment(5, nil),
oldDeployment: newDeployment(5, nil),
newRS: rs("foo-v3", 2, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)},
expectedNew: rs("foo-v3", 2, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)},
desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)},
},
{
name: "deployment with surge pods",
deployment: newDeploymentEnhanced(20, intstr.FromInt(2)),
oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(2)),
newRS: rs("foo-v2", 6, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)},
expectedNew: rs("foo-v2", 11, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v1", 11, nil, oldTimestamp)},
},
{
name: "change both surge and size",
deployment: newDeploymentEnhanced(50, intstr.FromInt(6)),
oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(3)),
newRS: rs("foo-v2", 5, nil, newTimestamp),
oldRSs: []*exp.ReplicaSet{rs("foo-v1", 8, nil, oldTimestamp)},
expectedNew: rs("foo-v2", 22, nil, newTimestamp),
expectedOld: []*exp.ReplicaSet{rs("foo-v1", 34, nil, oldTimestamp)},
},
}
for _, test := range tests {
_ = olderTimestamp
t.Log(test.name)
fake := fake.Clientset{}
dc := &DeploymentController{
client: &fake,
eventRecorder: &record.FakeRecorder{},
}
if test.newRS != nil {
desiredReplicas := test.oldDeployment.Spec.Replicas
if desired, ok := test.desiredReplicasAnnotations[test.newRS.Name]; ok {
desiredReplicas = desired
}
setReplicasAnnotations(test.newRS, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment))
}
for i := range test.oldRSs {
rs := test.oldRSs[i]
if rs == nil {
continue
}
desiredReplicas := test.oldDeployment.Spec.Replicas
if desired, ok := test.desiredReplicasAnnotations[rs.Name]; ok {
desiredReplicas = desired
}
setReplicasAnnotations(rs, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment))
}
if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil {
t.Errorf("%s: unexpected error: %v", test.name, err)
continue
}
if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != test.newRS.Spec.Replicas {
t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, test.newRS.Spec.Replicas)
continue
}
if len(test.expectedOld) != len(test.oldRSs) {
t.Errorf("%s: expected %d old replica sets, got %d", test.name, len(test.expectedOld), len(test.oldRSs))
continue
}
for n := range test.oldRSs {
rs := test.oldRSs[n]
exp := test.expectedOld[n]
if exp.Spec.Replicas != rs.Spec.Replicas {
t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, exp.Spec.Replicas, rs.Spec.Replicas)
}
}
}
}
func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) {
tests := []struct {
deploymentReplicas int
@ -188,8 +449,8 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) {
for i, test := range tests {
t.Logf("executing scenario %d", i)
newRS := rs("foo-v2", test.newReplicas, nil)
oldRS := rs("foo-v2", test.oldReplicas, nil)
newRS := rs("foo-v2", test.newReplicas, nil, noTimestamp)
oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp)
allRSs := []*exp.ReplicaSet{newRS, oldRS}
deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil)
fake := fake.Clientset{}
@ -289,8 +550,8 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) {
newSelector := map[string]string{"foo": "new"}
oldSelector := map[string]string{"foo": "old"}
newRS := rs("foo-new", test.newReplicas, newSelector)
oldRS := rs("foo-old", test.oldReplicas, oldSelector)
newRS := rs("foo-new", test.newReplicas, newSelector, noTimestamp)
oldRS := rs("foo-old", test.oldReplicas, oldSelector, noTimestamp)
oldRSs := []*exp.ReplicaSet{oldRS}
allRSs := []*exp.ReplicaSet{oldRS, newRS}
@ -429,7 +690,7 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) {
for i, test := range tests {
t.Logf("executing scenario %d", i)
oldRS := rs("foo-v2", test.oldReplicas, nil)
oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp)
oldRSs := []*exp.ReplicaSet{oldRS}
deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil)
fakeClientset := fake.Clientset{}
@ -538,7 +799,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
for i, test := range tests {
t.Logf("executing scenario %d", i)
oldRS := rs("foo-v2", test.oldReplicas, nil)
oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp)
allRSs := []*exp.ReplicaSet{oldRS}
oldRSs := []*exp.ReplicaSet{oldRS}
deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"})
@ -610,7 +871,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing
}
}
func TestDeploymentController_cleanupOldReplicaSets(t *testing.T) {
func TestDeploymentController_cleanupDeployment(t *testing.T) {
selector := map[string]string{"foo": "bar"}
tests := []struct {
@ -669,7 +930,7 @@ func TestDeploymentController_cleanupOldReplicaSets(t *testing.T) {
}
d := newDeployment(1, &tests[i].revisionHistoryLimit)
controller.cleanupOldReplicaSets(test.oldRSs, d)
controller.cleanupDeployment(test.oldRSs, d)
gotDeletions := 0
for _, action := range fake.Actions() {

View File

@ -0,0 +1,161 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deployment
import (
"fmt"
"sort"
"strconv"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
"k8s.io/kubernetes/pkg/util/integer"
)
// findActiveOrLatest returns the only active or the latest replica set in case there is at most one active
// replica set. If there are more active replica sets, then we should proportionally scale them.
func findActiveOrLatest(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) *extensions.ReplicaSet {
if newRS == nil && len(oldRSs) == 0 {
return nil
}
sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs)))
allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
switch len(allRSs) {
case 0:
// If there is no active replica set then we should return the newest.
if newRS != nil {
return newRS
}
return oldRSs[0]
case 1:
return allRSs[0]
default:
return nil
}
}
func getDesiredReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) {
return getIntFromAnnotation(rs, deploymentutil.DesiredReplicasAnnotation)
}
func getMaxReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) {
return getIntFromAnnotation(rs, deploymentutil.MaxReplicasAnnotation)
}
func getIntFromAnnotation(rs *extensions.ReplicaSet, annotationKey string) (int32, bool) {
annotationValue, ok := rs.Annotations[annotationKey]
if !ok {
return int32(0), false
}
intValue, err := strconv.Atoi(annotationValue)
if err != nil {
glog.Warningf("Cannot convert the value %q with annotation key %q for the replica set %q",
annotationValue, annotationKey, rs.Name)
return int32(0), false
}
return int32(intValue), true
}
func setReplicasAnnotations(rs *extensions.ReplicaSet, desiredReplicas, maxReplicas int32) bool {
updated := false
if rs.Annotations == nil {
rs.Annotations = make(map[string]string)
}
desiredString := fmt.Sprintf("%d", desiredReplicas)
if hasString := rs.Annotations[deploymentutil.DesiredReplicasAnnotation]; hasString != desiredString {
rs.Annotations[deploymentutil.DesiredReplicasAnnotation] = desiredString
updated = true
}
maxString := fmt.Sprintf("%d", maxReplicas)
if hasString := rs.Annotations[deploymentutil.MaxReplicasAnnotation]; hasString != maxString {
rs.Annotations[deploymentutil.MaxReplicasAnnotation] = maxString
updated = true
}
return updated
}
// maxUnavailable returns the maximum unavailable pods a rolling deployment can take.
func maxUnavailable(deployment extensions.Deployment) int32 {
if !deploymentutil.IsRollingUpdate(&deployment) {
return int32(0)
}
// Error caught by validation
_, maxUnavailable, _ := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
return maxUnavailable
}
// maxSurge returns the maximum surge pods a rolling deployment can take.
func maxSurge(deployment extensions.Deployment) int32 {
if !deploymentutil.IsRollingUpdate(&deployment) {
return int32(0)
}
// Error caught by validation
maxSurge, _, _ := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
return maxSurge
}
// getProportion will estimate the proportion for the provided replica set using 1. the current size
// of the parent deployment, 2. the replica count that needs be added on the replica sets of the
// deployment, and 3. the total replicas added in the replica sets of the deployment so far.
func getProportion(rs *extensions.ReplicaSet, d extensions.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 {
if rs == nil || rs.Spec.Replicas == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded {
return int32(0)
}
rsFraction := getReplicaSetFraction(*rs, d)
allowed := deploymentReplicasToAdd - deploymentReplicasAdded
if deploymentReplicasToAdd > 0 {
// Use the minimum between the replica set fraction and the maximum allowed replicas
// when scaling up. This way we ensure we will not scale up more than the allowed
// replicas we can add.
return integer.Int32Min(rsFraction, allowed)
}
// Use the maximum between the replica set fraction and the maximum allowed replicas
// when scaling down. This way we ensure we will not scale down more than the allowed
// replicas we can remove.
return integer.Int32Max(rsFraction, allowed)
}
// getReplicaSetFraction estimates the fraction of replicas a replica set can have in
// 1. a scaling event during a rollout or 2. when scaling a paused deployment.
func getReplicaSetFraction(rs extensions.ReplicaSet, d extensions.Deployment) int32 {
// If we are scaling down to zero then the fraction of this replica set is its whole size (negative)
if d.Spec.Replicas == int32(0) {
return -rs.Spec.Replicas
}
deploymentReplicas := d.Spec.Replicas + maxSurge(d)
annotatedReplicas, ok := getMaxReplicasAnnotation(&rs)
if !ok {
// If we cannot find the annotation then fallback to the current deployment size. Note that this
// will not be an accurate proportion estimation in case other replica sets have different values
// which means that the deployment was scaled at some point but we at least will stay in limits
// due to the min-max comparisons in getProportion.
annotatedReplicas = d.Status.Replicas
}
// We should never proportionally scale up from zero which means rs.spec.replicas and annotatedReplicas
// will never be zero here.
newRSsize := (float64(rs.Spec.Replicas * deploymentReplicas)) / float64(annotatedReplicas)
return integer.RoundToInt32(newRSsize) - rs.Spec.Replicas
}

View File

@ -41,6 +41,14 @@ import (
const (
// The revision annotation of a deployment's replica sets which records its rollout sequence
RevisionAnnotation = "deployment.kubernetes.io/revision"
// DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation
// in its replica sets. Helps in separating scaling events from the rollout process and for
// determining if the new replica set for a deployment is really saturated.
DesiredReplicasAnnotation = "deployment.kubernetes.io/desired-replicas"
// MaxReplicasAnnotation is the maximum replicas a deployment can have at a given point, which
// is deployment.spec.replicas + maxSurge. Used by the underlying replica sets to estimate their
// proportions in case the deployment has surge replicas.
MaxReplicasAnnotation = "deployment.kubernetes.io/max-replicas"
// Here are the possible rollback event reasons
RollbackRevisionNotFound = "DeploymentRollbackRevisionNotFound"
@ -434,6 +442,21 @@ func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.Re
}
}
// IsSaturated checks if the new replica set is saturated by comparing its size with its deployment size.
// Both the deployment and the replica set have to believe this replica set can own all of the desired
// replicas in the deployment and the annotation helps in achieving that.
func IsSaturated(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool {
if rs == nil {
return false
}
desiredString := rs.Annotations[DesiredReplicasAnnotation]
desired, err := strconv.Atoi(desiredString)
if err != nil {
return false
}
return rs.Spec.Replicas == deployment.Spec.Replicas && int32(desired) == deployment.Spec.Replicas
}
// Polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.
// Returns error if polling timesout.
func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error {

View File

@ -30,6 +30,20 @@ func IntMin(a, b int) int {
return a
}
func Int32Max(a, b int32) int32 {
if b > a {
return b
}
return a
}
func Int32Min(a, b int32) int32 {
if b < a {
return b
}
return a
}
func Int64Max(a, b int64) int64 {
if b > a {
return b
@ -43,3 +57,11 @@ func Int64Min(a, b int64) int64 {
}
return a
}
// RoundToInt32 rounds floats into integer numbers.
func RoundToInt32(a float64) int32 {
if a < 0 {
return int32(a - 0.5)
}
return int32(a + 0.5)
}

View File

@ -80,6 +80,68 @@ func TestIntMin(t *testing.T) {
}
}
func TestInt32Max(t *testing.T) {
tests := []struct {
nums []int32
expectedMax int32
}{
{
nums: []int32{-1, 0},
expectedMax: 0,
},
{
nums: []int32{-1, -2},
expectedMax: -1,
},
{
nums: []int32{0, 1},
expectedMax: 1,
},
{
nums: []int32{1, 2},
expectedMax: 2,
},
}
for i, test := range tests {
t.Logf("executing scenario %d", i)
if max := Int32Max(test.nums[0], test.nums[1]); max != test.expectedMax {
t.Errorf("expected %v, got %v", test.expectedMax, max)
}
}
}
func TestInt32Min(t *testing.T) {
tests := []struct {
nums []int32
expectedMin int32
}{
{
nums: []int32{-1, 0},
expectedMin: -1,
},
{
nums: []int32{-1, -2},
expectedMin: -2,
},
{
nums: []int32{0, 1},
expectedMin: 0,
},
{
nums: []int32{1, 2},
expectedMin: 1,
},
}
for i, test := range tests {
t.Logf("executing scenario %d", i)
if min := Int32Min(test.nums[0], test.nums[1]); min != test.expectedMin {
t.Errorf("expected %v, got %v", test.expectedMin, min)
}
}
}
func TestInt64Max(t *testing.T) {
tests := []struct {
nums []int64
@ -141,3 +203,42 @@ func TestInt64Min(t *testing.T) {
}
}
}
func TestRoundToInt32(t *testing.T) {
tests := []struct {
num float64
exp int32
}{
{
num: 5.5,
exp: 6,
},
{
num: -3.7,
exp: -4,
},
{
num: 3.49,
exp: 3,
},
{
num: -7.9,
exp: -8,
},
{
num: -4.499999,
exp: -4,
},
{
num: 0,
exp: 0,
},
}
for i, test := range tests {
t.Logf("executing scenario %d", i)
if got := RoundToInt32(test.num); got != test.exp {
t.Errorf("expected %d, got %d", test.exp, got)
}
}
}

View File

@ -144,5 +144,5 @@ func getIntOrPercentValue(intOrStr *IntOrString) (int, bool, error) {
}
return int(v), true, nil
}
return 0, false, fmt.Errorf("invalid value: neither int nor percentage")
return 0, false, fmt.Errorf("invalid type: neither int nor percentage")
}

View File

@ -80,6 +80,9 @@ var _ = framework.KubeDescribe("Deployment", func() {
It("deployment should label adopted RSs and pods", func() {
testDeploymentLabelAdopted(f)
})
It("paused deployment should be able to scale", func() {
testScalePausedDeployment(f)
})
})
func newRS(rsName string, replicas int32, rsPodLabels map[string]string, imageName string, image string) *extensions.ReplicaSet {
@ -569,6 +572,8 @@ func testPausedDeployment(f *framework.Framework) {
podLabels := map[string]string{"name": nginxImageName}
d := newDeployment(deploymentName, 1, podLabels, nginxImageName, nginxImage, extensions.RollingUpdateDeploymentStrategyType, nil)
d.Spec.Paused = true
tgps := int64(20)
d.Spec.Template.Spec.TerminationGracePeriodSeconds = &tgps
framework.Logf("Creating paused deployment %s", deploymentName)
_, err := c.Extensions().Deployments(ns).Create(d)
Expect(err).NotTo(HaveOccurred())
@ -622,21 +627,34 @@ func testPausedDeployment(f *framework.Framework) {
err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())
// Update the deployment template - the new replicaset should stay the same
framework.Logf("Updating paused deployment %q", deploymentName)
newTGPS := int64(40)
deployment, err = framework.UpdateDeploymentWithRetries(c, ns, d.Name, func(update *extensions.Deployment) {
update.Spec.Template.Spec.TerminationGracePeriodSeconds = &newTGPS
})
Expect(err).NotTo(HaveOccurred())
err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())
framework.Logf("Looking for new replicaset for paused deployment %q (there should be none)", deploymentName)
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
Expect(framework.DeleteReplicaSet(unversionedClient, ns, newRS.Name)).NotTo(HaveOccurred())
deployment, err = c.Extensions().Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
if !deployment.Spec.Paused {
err = fmt.Errorf("deployment %q should be paused", deployment.Name)
if newRS != nil {
err = fmt.Errorf("No replica set should match the deployment template but there is %q", newRS.Name)
Expect(err).NotTo(HaveOccurred())
}
shouldBeNil, err := deploymentutil.GetNewReplicaSet(deployment, c)
_, allOldRs, err := deploymentutil.GetOldReplicaSets(deployment, c)
Expect(err).NotTo(HaveOccurred())
if shouldBeNil != nil {
err = fmt.Errorf("deployment %q shouldn't have a replica set but there is %q", deployment.Name, shouldBeNil.Name)
if len(allOldRs) != 1 {
err = fmt.Errorf("expected an old replica set")
Expect(err).NotTo(HaveOccurred())
}
framework.Logf("Comparing deployment diff with old replica set %q", allOldRs[0].Name)
if *allOldRs[0].Spec.Template.Spec.TerminationGracePeriodSeconds == newTGPS {
err = fmt.Errorf("TerminationGracePeriodSeconds on the replica set should be %d but is %d", tgps, newTGPS)
Expect(err).NotTo(HaveOccurred())
}
}
@ -944,3 +962,54 @@ func testDeploymentLabelAdopted(f *framework.Framework) {
Expect(err).NotTo(HaveOccurred())
Expect(int32(len(pods.Items))).Should(Equal(replicas))
}
func testScalePausedDeployment(f *framework.Framework) {
ns := f.Namespace.Name
c := adapter.FromUnversionedClient(f.Client)
podLabels := map[string]string{"name": nginxImageName}
replicas := int32(3)
// Create a nginx deployment.
deploymentName := "nginx-deployment"
d := newDeployment(deploymentName, replicas, podLabels, nginxImageName, nginxImage, extensions.RollingUpdateDeploymentStrategyType, nil)
framework.Logf("Creating deployment %q", deploymentName)
_, err := c.Extensions().Deployments(ns).Create(d)
Expect(err).NotTo(HaveOccurred())
defer stopDeployment(c, f.Client, ns, deploymentName)
// Check that deployment is created fine.
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
Expect(err).NotTo(HaveOccurred())
err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())
rs, err := deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
// Pause the deployment and try to scale it.
deployment, err = framework.UpdateDeploymentWithRetries(c, ns, d.Name, func(update *extensions.Deployment) {
update.Spec.Paused = true
})
Expect(err).NotTo(HaveOccurred())
// Scale the paused deployment.
framework.Logf("Scaling up the paused deployment %q", deploymentName)
newReplicas := int32(5)
deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) {
update.Spec.Replicas = newReplicas
})
Expect(err).NotTo(HaveOccurred())
err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())
rs, err = deploymentutil.GetNewReplicaSet(deployment, c)
Expect(err).NotTo(HaveOccurred())
if rs.Spec.Replicas != newReplicas {
err = fmt.Errorf("Expected %d replicas for the new replica set, got %d", newReplicas, rs.Spec.Replicas)
Expect(err).NotTo(HaveOccurred())
}
}