From bad8b6dde4315b86438b3512388e0659e66640af Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Fri, 26 Feb 2016 14:34:33 +0100 Subject: [PATCH 1/3] integer: add utility for proper integer rounding --- pkg/util/integer/integer.go | 8 +++++++ pkg/util/integer/integer_test.go | 39 ++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/pkg/util/integer/integer.go b/pkg/util/integer/integer.go index c51cd952d1d..81e28cfbf49 100644 --- a/pkg/util/integer/integer.go +++ b/pkg/util/integer/integer.go @@ -43,3 +43,11 @@ func Int64Min(a, b int64) int64 { } return a } + +// RoundToInt32 rounds floats into integer numbers. +func RoundToInt32(a float64) int32 { + if a < 0 { + return int32(a - 0.5) + } + return int32(a + 0.5) +} diff --git a/pkg/util/integer/integer_test.go b/pkg/util/integer/integer_test.go index 0f885673827..2000a457058 100644 --- a/pkg/util/integer/integer_test.go +++ b/pkg/util/integer/integer_test.go @@ -141,3 +141,42 @@ func TestInt64Min(t *testing.T) { } } } + +func TestRoundToInt32(t *testing.T) { + tests := []struct { + num float64 + exp int32 + }{ + { + num: 5.5, + exp: 6, + }, + { + num: -3.7, + exp: -4, + }, + { + num: 3.49, + exp: 3, + }, + { + num: -7.9, + exp: -8, + }, + { + num: -4.499999, + exp: -4, + }, + { + num: 0, + exp: 0, + }, + } + + for i, test := range tests { + t.Logf("executing scenario %d", i) + if got := RoundToInt32(test.num); got != test.exp { + t.Errorf("expected %d, got %d", test.exp, got) + } + } +} From a098d9fd2476503cf10dff05d331c02307ae9ed4 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Thu, 16 Jun 2016 14:53:25 +0200 Subject: [PATCH 2/3] integer: add int32 min/max helpers --- pkg/util/integer/integer.go | 14 ++++++++ pkg/util/integer/integer_test.go | 62 ++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/pkg/util/integer/integer.go b/pkg/util/integer/integer.go index 81e28cfbf49..99cde5d3633 100644 --- a/pkg/util/integer/integer.go +++ b/pkg/util/integer/integer.go @@ -30,6 +30,20 @@ func IntMin(a, b int) int { return a } +func Int32Max(a, b int32) int32 { + if b > a { + return b + } + return a +} + +func Int32Min(a, b int32) int32 { + if b < a { + return b + } + return a +} + func Int64Max(a, b int64) int64 { if b > a { return b diff --git a/pkg/util/integer/integer_test.go b/pkg/util/integer/integer_test.go index 2000a457058..93ae44c4aae 100644 --- a/pkg/util/integer/integer_test.go +++ b/pkg/util/integer/integer_test.go @@ -80,6 +80,68 @@ func TestIntMin(t *testing.T) { } } +func TestInt32Max(t *testing.T) { + tests := []struct { + nums []int32 + expectedMax int32 + }{ + { + nums: []int32{-1, 0}, + expectedMax: 0, + }, + { + nums: []int32{-1, -2}, + expectedMax: -1, + }, + { + nums: []int32{0, 1}, + expectedMax: 1, + }, + { + nums: []int32{1, 2}, + expectedMax: 2, + }, + } + + for i, test := range tests { + t.Logf("executing scenario %d", i) + if max := Int32Max(test.nums[0], test.nums[1]); max != test.expectedMax { + t.Errorf("expected %v, got %v", test.expectedMax, max) + } + } +} + +func TestInt32Min(t *testing.T) { + tests := []struct { + nums []int32 + expectedMin int32 + }{ + { + nums: []int32{-1, 0}, + expectedMin: -1, + }, + { + nums: []int32{-1, -2}, + expectedMin: -2, + }, + { + nums: []int32{0, 1}, + expectedMin: 0, + }, + { + nums: []int32{1, 2}, + expectedMin: 1, + }, + } + + for i, test := range tests { + t.Logf("executing scenario %d", i) + if min := Int32Min(test.nums[0], test.nums[1]); min != test.expectedMin { + t.Errorf("expected %v, got %v", test.expectedMin, min) + } + } +} + func TestInt64Max(t *testing.T) { tests := []struct { nums []int64 From f3d2e3ff2203da20053f9a104f3cce6571dbe293 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Thu, 28 Jan 2016 17:35:14 +0100 Subject: [PATCH 3/3] controller: proportionally scale paused and rolling deployments Enable paused and rolling deployments to be proportionally scaled. Also have cleanup policy work for paused deployments. --- hack/test-cmd.sh | 20 +- hack/testdata/deployment-revision1.yaml | 8 +- hack/testdata/deployment-revision2.yaml | 8 +- pkg/apis/extensions/validation/validation.go | 9 +- pkg/controller/controller_utils.go | 34 ++- .../deployment/deployment_controller.go | 284 ++++++++++++----- .../deployment/deployment_controller_test.go | 289 +++++++++++++++++- pkg/controller/deployment/util.go | 161 ++++++++++ pkg/util/deployment/deployment.go | 23 ++ pkg/util/intstr/intstr.go | 2 +- test/e2e/deployment.go | 89 +++++- 11 files changed, 798 insertions(+), 129 deletions(-) create mode 100644 pkg/controller/deployment/util.go diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 78ae3843d05..a0b412564fe 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -1772,35 +1772,35 @@ __EOF__ # Command # Create a deployment (revision 1) kubectl create -f hack/testdata/deployment-revision1.yaml "${kube_flags[@]}" - kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx-deployment:' + kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx:' kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R1}:" # Rollback to revision 1 - should be no-op - kubectl rollout undo deployment nginx-deployment --to-revision=1 "${kube_flags[@]}" + kubectl rollout undo deployment nginx --to-revision=1 "${kube_flags[@]}" kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R1}:" # Update the deployment (revision 2) kubectl apply -f hack/testdata/deployment-revision2.yaml "${kube_flags[@]}" kube::test::get_object_assert deployment.extensions "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R2}:" # Rollback to revision 1 - kubectl rollout undo deployment nginx-deployment --to-revision=1 "${kube_flags[@]}" + kubectl rollout undo deployment nginx --to-revision=1 "${kube_flags[@]}" sleep 1 kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R1}:" # Rollback to revision 1000000 - should be no-op - kubectl rollout undo deployment nginx-deployment --to-revision=1000000 "${kube_flags[@]}" + kubectl rollout undo deployment nginx --to-revision=1000000 "${kube_flags[@]}" kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R1}:" # Rollback to last revision - kubectl rollout undo deployment nginx-deployment "${kube_flags[@]}" + kubectl rollout undo deployment nginx "${kube_flags[@]}" sleep 1 kube::test::get_object_assert deployment "{{range.items}}{{$deployment_image_field}}:{{end}}" "${IMAGE_DEPLOYMENT_R2}:" # Pause the deployment - kubectl-with-retry rollout pause deployment nginx-deployment "${kube_flags[@]}" + kubectl-with-retry rollout pause deployment nginx "${kube_flags[@]}" # A paused deployment cannot be rolled back - ! kubectl rollout undo deployment nginx-deployment "${kube_flags[@]}" + ! kubectl rollout undo deployment nginx "${kube_flags[@]}" # Resume the deployment - kubectl-with-retry rollout resume deployment nginx-deployment "${kube_flags[@]}" + kubectl-with-retry rollout resume deployment nginx "${kube_flags[@]}" # The resumed deployment can now be rolled back - kubectl rollout undo deployment nginx-deployment "${kube_flags[@]}" + kubectl rollout undo deployment nginx "${kube_flags[@]}" # Clean up - kubectl delete deployment nginx-deployment "${kube_flags[@]}" + kubectl delete deployment nginx "${kube_flags[@]}" ### Set image of a deployment # Pre-condition: no deployment exists diff --git a/hack/testdata/deployment-revision1.yaml b/hack/testdata/deployment-revision1.yaml index 75daa6703f8..cfbec36c454 100644 --- a/hack/testdata/deployment-revision1.yaml +++ b/hack/testdata/deployment-revision1.yaml @@ -1,18 +1,18 @@ apiVersion: extensions/v1beta1 kind: Deployment metadata: - name: nginx-deployment + name: nginx labels: - name: nginx-deployment + name: nginx-undo spec: replicas: 3 selector: matchLabels: - name: nginx + name: nginx-undo template: metadata: labels: - name: nginx + name: nginx-undo spec: containers: - name: nginx diff --git a/hack/testdata/deployment-revision2.yaml b/hack/testdata/deployment-revision2.yaml index 092dfb76594..4b171f604bc 100644 --- a/hack/testdata/deployment-revision2.yaml +++ b/hack/testdata/deployment-revision2.yaml @@ -1,18 +1,18 @@ apiVersion: extensions/v1beta1 kind: Deployment metadata: - name: nginx-deployment + name: nginx labels: - name: nginx-deployment + name: nginx-undo spec: replicas: 3 selector: matchLabels: - name: nginx + name: nginx-undo template: metadata: labels: - name: nginx + name: nginx-undo spec: containers: - name: nginx diff --git a/pkg/apis/extensions/validation/validation.go b/pkg/apis/extensions/validation/validation.go index d0405c3770f..564ff6e413a 100644 --- a/pkg/apis/extensions/validation/validation.go +++ b/pkg/apis/extensions/validation/validation.go @@ -147,12 +147,15 @@ var ValidateDeploymentName = apivalidation.NameIsDNSSubdomain func ValidatePositiveIntOrPercent(intOrPercent intstr.IntOrString, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} - if intOrPercent.Type == intstr.String { + switch intOrPercent.Type { + case intstr.String: if !validation.IsValidPercent(intOrPercent.StrVal) { - allErrs = append(allErrs, field.Invalid(fldPath, intOrPercent, "must be an integer or percentage (e.g '5%')")) + allErrs = append(allErrs, field.Invalid(fldPath, intOrPercent, "must be an integer or percentage (e.g '5%%')")) } - } else if intOrPercent.Type == intstr.Int { + case intstr.Int: allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(intOrPercent.IntValue()), fldPath)...) + default: + allErrs = append(allErrs, field.Invalid(fldPath, intOrPercent, "must be an integer or percentage (e.g '5%%')")) } return allErrs } diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index a7aba40bece..a9886568d7b 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -619,7 +619,9 @@ func IsPodActive(p api.Pod) bool { func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions.ReplicaSet { active := []*extensions.ReplicaSet{} for i := range replicaSets { - if replicaSets[i].Spec.Replicas > 0 { + rs := replicaSets[i] + + if rs != nil && rs.Spec.Replicas > 0 { active = append(active, replicaSets[i]) } } @@ -639,7 +641,6 @@ type ControllersByCreationTimestamp []*api.ReplicationController func (o ControllersByCreationTimestamp) Len() int { return len(o) } func (o ControllersByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } - func (o ControllersByCreationTimestamp) Less(i, j int) bool { if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { return o[i].Name < o[j].Name @@ -647,15 +648,40 @@ func (o ControllersByCreationTimestamp) Less(i, j int) bool { return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) } -// ReplicaSetsByCreationTimestamp sorts a list of ReplicationSets by creation timestamp, using their names as a tie breaker. +// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker. type ReplicaSetsByCreationTimestamp []*extensions.ReplicaSet func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) } func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } - func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool { if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { return o[i].Name < o[j].Name } return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) } + +// ReplicaSetsBySizeOlder sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker. +// By using the creation timestamp, this sorts from old to new replica sets. +type ReplicaSetsBySizeOlder []*extensions.ReplicaSet + +func (o ReplicaSetsBySizeOlder) Len() int { return len(o) } +func (o ReplicaSetsBySizeOlder) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsBySizeOlder) Less(i, j int) bool { + if o[i].Spec.Replicas == o[j].Spec.Replicas { + return ReplicaSetsByCreationTimestamp(o).Less(i, j) + } + return o[i].Spec.Replicas > o[j].Spec.Replicas +} + +// ReplicaSetsBySizeNewer sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker. +// By using the creation timestamp, this sorts from new to old replica sets. +type ReplicaSetsBySizeNewer []*extensions.ReplicaSet + +func (o ReplicaSetsBySizeNewer) Len() int { return len(o) } +func (o ReplicaSetsBySizeNewer) Swap(i, j int) { o[i], o[j] = o[j], o[i] } +func (o ReplicaSetsBySizeNewer) Less(i, j int) bool { + if o[i].Spec.Replicas == o[j].Spec.Replicas { + return ReplicaSetsByCreationTimestamp(o).Less(j, i) + } + return o[i].Spec.Replicas > o[j].Spec.Replicas +} diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 66dc993acfb..ef6d7fa71a1 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -438,13 +438,9 @@ func (dc *DeploymentController) syncDeployment(key string) error { } if d.Spec.Paused { - // TODO: Implement scaling for paused deployments. - // Don't take any action for paused deployment. - // But keep the status up-to-date. - // Ignore paused deployments - glog.V(4).Infof("Updating status only for paused deployment %s/%s", d.Namespace, d.Name) - return dc.syncPausedDeploymentStatus(d) + return dc.sync(d) } + if d.Spec.RollbackTo != nil { revision := d.Spec.RollbackTo.Revision if _, err = dc.rollback(d, &revision); err != nil { @@ -452,27 +448,135 @@ func (dc *DeploymentController) syncDeployment(key string) error { } } + if dc.isScalingEvent(d) { + return dc.sync(d) + } + switch d.Spec.Strategy.Type { case extensions.RecreateDeploymentStrategyType: - return dc.syncRecreateDeployment(d) + return dc.rolloutRecreate(d) case extensions.RollingUpdateDeploymentStrategyType: - return dc.syncRollingUpdateDeployment(d) + return dc.rolloutRolling(d) } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) } -// Updates the status of a paused deployment -func (dc *DeploymentController) syncPausedDeploymentStatus(deployment *extensions.Deployment) error { +// sync is responsible for reconciling deployments on scaling events or when they +// are paused. +func (dc *DeploymentController) sync(deployment *extensions.Deployment) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) if err != nil { return err } - allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS) + if err := dc.scale(deployment, newRS, oldRSs); err != nil { + // If we get an error while trying to scale, the deployment will be requeued + // so we can abort this resync + return err + } + dc.cleanupDeployment(oldRSs, deployment) - // Sync deployment status + allRSs := append(oldRSs, newRS) return dc.syncDeploymentStatus(allRSs, newRS, deployment) } +// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size +// of the new replica set and scaling down can decrease the sizes of the old ones, both of which would +// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable +// replicas in the event of a problem with the rolled out template. Should run only on scaling events or +// when a deployment is paused and not during the normal rollout process. +func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error { + // If there is only one active replica set then we should scale that up to the full count of the + // deployment. If there is no active replica set, then we should scale up the newest replica set. + if activeOrLatest := findActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { + if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas { + return nil + } + _, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, deployment.Spec.Replicas, deployment) + return err + } + + // If the new replica set is saturated, old replica sets should be fully scaled down. + // This case handles replica set adoption during a saturated new replica set. + if deploymentutil.IsSaturated(deployment, newRS) { + for _, old := range controller.FilterActiveReplicaSets(oldRSs) { + if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil { + return err + } + } + return nil + } + + // There are old replica sets with pods and the new replica set is not saturated. + // We need to proportionally scale all replica sets (new and old) in case of a + // rolling deployment. + if deploymentutil.IsRollingUpdate(deployment) { + allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) + allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) + + allowedSize := int32(0) + if deployment.Spec.Replicas > 0 { + allowedSize = deployment.Spec.Replicas + maxSurge(*deployment) + } + + // Number of additional replicas that can be either added or removed from the total + // replicas count. These replicas should be distributed proportionally to the active + // replica sets. + deploymentReplicasToAdd := allowedSize - allRSsReplicas + + // The additional replicas should be distributed proportionally amongst the active + // replica sets from the larger to the smaller in size replica set. Scaling direction + // drives what happens in case we are trying to scale replica sets of the same size. + // In such a case when scaling up, we should scale up newer replica sets first, and + // when scaling down, we should scale down older replica sets first. + scalingOperation := "up" + switch { + case deploymentReplicasToAdd > 0: + sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs)) + + case deploymentReplicasToAdd < 0: + sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs)) + scalingOperation = "down" + + default: /* deploymentReplicasToAdd == 0 */ + // Nothing to add. + return nil + } + + // Iterate over all active replica sets and estimate proportions for each of them. + // The absolute value of deploymentReplicasAdded should never exceed the absolute + // value of deploymentReplicasToAdd. + deploymentReplicasAdded := int32(0) + for i := range allRSs { + rs := allRSs[i] + + proportion := getProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) + + rs.Spec.Replicas += proportion + deploymentReplicasAdded += proportion + } + + // Update all replica sets + for i := range allRSs { + rs := allRSs[i] + + // Add/remove any leftovers to the largest replica set. + if i == 0 { + leftover := deploymentReplicasToAdd - deploymentReplicasAdded + rs.Spec.Replicas += leftover + if rs.Spec.Replicas < 0 { + rs.Spec.Replicas = 0 + } + } + + if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil { + // Return as soon as we fail, the deployment is requeued + return err + } + } + } + return nil +} + // Rolling back to a revision; no-op if the toRevision is deployment's current revision func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) { newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) @@ -526,13 +630,13 @@ func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *e return dc.updateDeployment(deployment) } -func (dc *DeploymentController) syncRecreateDeployment(deployment *extensions.Deployment) error { +func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error { // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false) if err != nil { return err } - allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS) + allRSs := append(oldRSs, newRS) // scale down old replica sets scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(controller.FilterActiveReplicaSets(oldRSs), deployment) @@ -564,21 +668,18 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment *extensions.De return dc.updateDeploymentStatus(allRSs, newRS, deployment) } - if deployment.Spec.RevisionHistoryLimit != nil { - // Cleanup old replica sets - dc.cleanupOldReplicaSets(oldRSs, deployment) - } + dc.cleanupDeployment(oldRSs, deployment) // Sync deployment status return dc.syncDeploymentStatus(allRSs, newRS, deployment) } -func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensions.Deployment) error { +func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) if err != nil { return err } - allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS) + allRSs := append(oldRSs, newRS) // Scale up, if we can. scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment) @@ -600,10 +701,7 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensio return dc.updateDeploymentStatus(allRSs, newRS, deployment) } - if deployment.Spec.RevisionHistoryLimit != nil { - // Cleanup old replicas sets - dc.cleanupOldReplicaSets(oldRSs, deployment) - } + dc.cleanupDeployment(oldRSs, deployment) // Sync deployment status return dc.syncDeploymentStatus(allRSs, newRS, deployment) @@ -611,11 +709,11 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensio // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { - totalActualReplicas, updatedReplicas, availableReplicas, _, err := dc.calculateStatus(allRSs, newRS, d) + newStatus, err := dc.calculateStatus(allRSs, newRS, d) if err != nil { return err } - if d.Generation > d.Status.ObservedGeneration || d.Status.Replicas != totalActualReplicas || d.Status.UpdatedReplicas != updatedReplicas || d.Status.AvailableReplicas != availableReplicas { + if !reflect.DeepEqual(d.Status, newStatus) { return dc.updateDeploymentStatus(allRSs, newRS, d) } return nil @@ -626,6 +724,8 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.Replic // 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1), // only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop. // 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop. +// Note that currently the deployment controller is using caches to avoid querying the server for reads. +// This may lead to stale reads of replica sets, thus incorrect deployment status. func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { // List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment) @@ -701,7 +801,7 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme return nil, err } else if existingNewRS != nil { // Set existing new replica set's annotation - if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision) { + if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) { return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS) } return existingNewRS, nil @@ -731,8 +831,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme Template: newRSTemplate, }, } - // Set new replica set's annotation - setNewReplicaSetAnnotations(deployment, &newRS, newRevision) allRSs := append(oldRSs, &newRS) newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS) if err != nil { @@ -740,6 +838,8 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme } newRS.Spec.Replicas = newReplicasCount + // Set new replica set's annotation + setNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) if err != nil { dc.enqueueDeployment(deployment) @@ -881,7 +981,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) // setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and // copying required deployment annotations to it; it returns true if replica set's annotation is changed. -func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string) bool { +func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool { // First, copy deployment's annotations (except for apply and revision annotations) annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) // Then, update replica set's revision annotation @@ -894,17 +994,26 @@ func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *exten if newRS.Annotations[deploymentutil.RevisionAnnotation] < newRevision { newRS.Annotations[deploymentutil.RevisionAnnotation] = newRevision annotationChanged = true - glog.V(4).Infof("updating replica set %q's revision to %s - %+v\n", newRS.Name, newRevision, newRS) + glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) + } + if !exists && setReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) { + annotationChanged = true } return annotationChanged } +var annotationsToSkip = map[string]bool{ + annotations.LastAppliedConfigAnnotation: true, + deploymentutil.RevisionAnnotation: true, + deploymentutil.DesiredReplicasAnnotation: true, + deploymentutil.MaxReplicasAnnotation: true, +} + // skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key // TODO: How to decide which annotations should / should not be copied? // See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 func skipCopyAnnotation(key string) bool { - // Skip apply annotations and revision annotations. - return key == annotations.LastAppliedConfigAnnotation || key == deploymentutil.RevisionAnnotation + return annotationsToSkip[key] } func getSkippedAnnotations(annotations map[string]string) map[string]string { @@ -980,12 +1089,12 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl return scaled, err } -func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) { +func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) { podList, err := dc.listPods(deployment) if err != nil { return 0, err } - return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds) + return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds) } func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { @@ -1002,11 +1111,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep if err != nil { return false, fmt.Errorf("could not find available pods: %v", err) } - - _, maxUnavailable, err := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas) - if err != nil { - return false, err - } + maxUnavailable := maxUnavailable(*deployment) // Check if we can scale down. We can scale down in the following 2 cases: // * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further @@ -1108,10 +1213,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re // scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate". // Need check maxUnavailable to ensure availability func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int32, error) { - _, maxUnavailable, err := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas) - if err != nil { - return 0, err - } + maxUnavailable := maxUnavailable(*deployment) // Check if we can scale down. minAvailable := deployment.Spec.Replicas - maxUnavailable @@ -1183,7 +1285,13 @@ func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extension return scaled, err } -func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error { +// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets +// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept +// around by default 1) for historical reasons and 2) for the ability to rollback a deployment. +func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error { + if deployment.Spec.RevisionHistoryLimit == nil { + return nil + } diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit if diff <= 0 { return nil @@ -1209,39 +1317,31 @@ func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.Repli } func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error { - totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas, err := dc.calculateStatus(allRSs, newRS, deployment) + newStatus, err := dc.calculateStatus(allRSs, newRS, deployment) if err != nil { return err } - newDeployment := *deployment - // TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods. - newDeployment.Status = extensions.DeploymentStatus{ - // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. - ObservedGeneration: deployment.Generation, - Replicas: totalActualReplicas, - UpdatedReplicas: updatedReplicas, - AvailableReplicas: availableReplicas, - UnavailableReplicas: unavailableReplicas, - } - _, err = dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment) - if err == nil { - glog.V(4).Infof("Updated deployment %s status: %+v", deployment.Name, newDeployment.Status) - } + newDeployment := deployment + newDeployment.Status = newStatus + _, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment) return err } -func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas int32, err error) { - totalActualReplicas = deploymentutil.GetActualReplicaCountForReplicaSets(allRSs) - updatedReplicas = deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}) - minReadySeconds := deployment.Spec.MinReadySeconds - availableReplicas, err = dc.getAvailablePodsForReplicaSets(deployment, allRSs, minReadySeconds) +func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) { + availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs) if err != nil { - err = fmt.Errorf("failed to count available pods: %v", err) - return + return deployment.Status, fmt.Errorf("failed to count available pods: %v", err) } totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - unavailableReplicas = totalReplicas - availableReplicas - return + + return extensions.DeploymentStatus{ + // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. + ObservedGeneration: deployment.Generation, + Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs), + UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}), + AvailableReplicas: availableReplicas, + UnavailableReplicas: totalReplicas - availableReplicas, + }, nil } func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) { @@ -1255,24 +1355,25 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep } else { scalingOperation = "down" } - newRS, err := dc.scaleReplicaSet(rs, newScale) - if err == nil { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) - } else { - dc.enqueueDeployment(deployment) - } + newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation) return true, newRS, err } -func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32) (*extensions.ReplicaSet, error) { - // TODO: Using client for now, update to use store when it is ready. +func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) { // NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea. rs.Spec.Replicas = newScale - return dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) + setReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) + rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) + if err == nil { + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) + } else { + glog.Warningf("Cannot update replica set %q: %v", rs.Name, err) + dc.enqueueDeployment(deployment) + } + return rs, err } func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) { - // TODO: Using client for now, update to use store when it is ready. return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) } @@ -1300,3 +1401,28 @@ func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deploy d, err = dc.updateDeploymentAndClearRollbackTo(deployment) return } + +// isScalingEvent checks whether the provided deployment has been updated with a scaling event +// by looking at the desired-replicas annotation in the active replica sets of the deployment. +func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) bool { + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) + if err != nil { + return false + } + // If there is no new replica set matching this deployment and the deployment isn't paused + // then there is a new rollout that waits to happen + if newRS == nil && !d.Spec.Paused { + return false + } + allRSs := append(oldRSs, newRS) + for _, rs := range controller.FilterActiveReplicaSets(allRSs) { + desired, ok := getDesiredReplicasAnnotation(rs) + if !ok { + continue + } + if desired != d.Spec.Replicas { + return true + } + } + return false +} diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index f9455979cd8..025bbc78871 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -19,6 +19,7 @@ package deployment import ( "fmt" "testing" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" @@ -33,10 +34,16 @@ import ( "k8s.io/kubernetes/pkg/util/intstr" ) -func rs(name string, replicas int, selector map[string]string) *exp.ReplicaSet { +var ( + alwaysReady = func() bool { return true } + noTimestamp = unversioned.Time{} +) + +func rs(name string, replicas int, selector map[string]string, timestamp unversioned.Time) *exp.ReplicaSet { return &exp.ReplicaSet{ ObjectMeta: api.ObjectMeta{ - Name: name, + Name: name, + CreationTimestamp: timestamp, }, Spec: exp.ReplicaSetSpec{ Replicas: int32(replicas), @@ -47,7 +54,7 @@ func rs(name string, replicas int, selector map[string]string) *exp.ReplicaSet { } func newRSWithStatus(name string, specReplicas, statusReplicas int, selector map[string]string) *exp.ReplicaSet { - rs := rs(name, specReplicas, selector) + rs := rs(name, specReplicas, selector, noTimestamp) rs.Status = exp.ReplicaSetStatus{ Replicas: int32(statusReplicas), } @@ -73,8 +80,6 @@ func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOr } } -var alwaysReady = func() bool { return true } - func newDeployment(replicas int, revisionHistoryLimit *int) *exp.Deployment { var v *int32 if revisionHistoryLimit != nil { @@ -117,6 +122,13 @@ func newDeployment(replicas int, revisionHistoryLimit *int) *exp.Deployment { return &d } +// TODO: Consolidate all deployment helpers into one. +func newDeploymentEnhanced(replicas int, maxSurge intstr.IntOrString) *exp.Deployment { + d := newDeployment(replicas, nil) + d.Spec.Strategy.RollingUpdate.MaxSurge = maxSurge + return d +} + func newReplicaSet(d *exp.Deployment, name string, replicas int) *exp.ReplicaSet { return &exp.ReplicaSet{ ObjectMeta: api.ObjectMeta{ @@ -128,13 +140,262 @@ func newReplicaSet(d *exp.Deployment, name string, replicas int) *exp.ReplicaSet Template: d.Spec.Template, }, } - } func newListOptions() api.ListOptions { return api.ListOptions{} } +// TestScale tests proportional scaling of deployments. Note that fenceposts for +// rolling out (maxUnavailable, maxSurge) have no meaning for simple scaling other +// than recording maxSurge as part of the max-replicas annotation that is taken +// into account in the next scale event (max-replicas is used for calculating the +// proportion of a replica set). +func TestScale(t *testing.T) { + newTimestamp := unversioned.Date(2016, 5, 20, 2, 0, 0, 0, time.UTC) + oldTimestamp := unversioned.Date(2016, 5, 20, 1, 0, 0, 0, time.UTC) + olderTimestamp := unversioned.Date(2016, 5, 20, 0, 0, 0, 0, time.UTC) + + tests := []struct { + name string + deployment *exp.Deployment + oldDeployment *exp.Deployment + + newRS *exp.ReplicaSet + oldRSs []*exp.ReplicaSet + + expectedNew *exp.ReplicaSet + expectedOld []*exp.ReplicaSet + + desiredReplicasAnnotations map[string]int32 + }{ + { + name: "normal scaling event: 10 -> 12", + deployment: newDeployment(12, nil), + oldDeployment: newDeployment(10, nil), + + newRS: rs("foo-v1", 10, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{}, + + expectedNew: rs("foo-v1", 12, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{}, + }, + { + name: "normal scaling event: 10 -> 5", + deployment: newDeployment(5, nil), + oldDeployment: newDeployment(10, nil), + + newRS: rs("foo-v1", 10, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{}, + + expectedNew: rs("foo-v1", 5, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{}, + }, + { + name: "proportional scaling: 5 -> 10", + deployment: newDeployment(10, nil), + oldDeployment: newDeployment(5, nil), + + newRS: rs("foo-v2", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 4, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, + }, + { + name: "proportional scaling: 5 -> 3", + deployment: newDeployment(3, nil), + oldDeployment: newDeployment(5, nil), + + newRS: rs("foo-v2", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 3, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 1, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 2, nil, oldTimestamp)}, + }, + { + name: "proportional scaling: 9 -> 4", + deployment: newDeployment(4, nil), + oldDeployment: newDeployment(9, nil), + + newRS: rs("foo-v2", 8, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 1, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 4, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 0, nil, oldTimestamp)}, + }, + { + name: "proportional scaling: 7 -> 10", + deployment: newDeployment(10, nil), + oldDeployment: newDeployment(7, nil), + + newRS: rs("foo-v3", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 3, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 4, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, + }, + { + name: "proportional scaling: 13 -> 8", + deployment: newDeployment(8, nil), + oldDeployment: newDeployment(13, nil), + + newRS: rs("foo-v3", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 8, nil, oldTimestamp), rs("foo-v1", 3, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 1, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 5, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, + }, + // Scales up the new replica set. + { + name: "leftover distribution: 3 -> 4", + deployment: newDeployment(4, nil), + oldDeployment: newDeployment(3, nil), + + newRS: rs("foo-v3", 1, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 2, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + }, + // Scales down the older replica set. + { + name: "leftover distribution: 3 -> 2", + deployment: newDeployment(2, nil), + oldDeployment: newDeployment(3, nil), + + newRS: rs("foo-v3", 1, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 1, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + }, + // Scales up the latest replica set first. + { + name: "proportional scaling (no new rs): 4 -> 5", + deployment: newDeployment(5, nil), + oldDeployment: newDeployment(4, nil), + + newRS: nil, + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, + + expectedNew: nil, + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 3, nil, oldTimestamp), rs("foo-v1", 2, nil, olderTimestamp)}, + }, + // Scales down to zero + { + name: "proportional scaling: 6 -> 0", + deployment: newDeployment(0, nil), + oldDeployment: newDeployment(6, nil), + + newRS: rs("foo-v3", 3, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 0, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + }, + // Scales up from zero + { + name: "proportional scaling: 0 -> 6", + deployment: newDeployment(6, nil), + oldDeployment: newDeployment(0, nil), + + newRS: rs("foo-v3", 0, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 6, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + }, + // Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 ) + // Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to + // update. + { + name: "failed rs update", + deployment: newDeployment(5, nil), + oldDeployment: newDeployment(5, nil), + + newRS: rs("foo-v3", 2, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + expectedNew: rs("foo-v3", 2, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + + desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)}, + }, + { + name: "deployment with surge pods", + deployment: newDeploymentEnhanced(20, intstr.FromInt(2)), + oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(2)), + + newRS: rs("foo-v2", 6, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 6, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 11, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 11, nil, oldTimestamp)}, + }, + { + name: "change both surge and size", + deployment: newDeploymentEnhanced(50, intstr.FromInt(6)), + oldDeployment: newDeploymentEnhanced(10, intstr.FromInt(3)), + + newRS: rs("foo-v2", 5, nil, newTimestamp), + oldRSs: []*exp.ReplicaSet{rs("foo-v1", 8, nil, oldTimestamp)}, + + expectedNew: rs("foo-v2", 22, nil, newTimestamp), + expectedOld: []*exp.ReplicaSet{rs("foo-v1", 34, nil, oldTimestamp)}, + }, + } + + for _, test := range tests { + _ = olderTimestamp + t.Log(test.name) + fake := fake.Clientset{} + dc := &DeploymentController{ + client: &fake, + eventRecorder: &record.FakeRecorder{}, + } + + if test.newRS != nil { + desiredReplicas := test.oldDeployment.Spec.Replicas + if desired, ok := test.desiredReplicasAnnotations[test.newRS.Name]; ok { + desiredReplicas = desired + } + setReplicasAnnotations(test.newRS, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) + } + for i := range test.oldRSs { + rs := test.oldRSs[i] + if rs == nil { + continue + } + desiredReplicas := test.oldDeployment.Spec.Replicas + if desired, ok := test.desiredReplicasAnnotations[rs.Name]; ok { + desiredReplicas = desired + } + setReplicasAnnotations(rs, desiredReplicas, desiredReplicas+maxSurge(*test.oldDeployment)) + } + + if err := dc.scale(test.deployment, test.newRS, test.oldRSs); err != nil { + t.Errorf("%s: unexpected error: %v", test.name, err) + continue + } + if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != test.newRS.Spec.Replicas { + t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, test.newRS.Spec.Replicas) + continue + } + if len(test.expectedOld) != len(test.oldRSs) { + t.Errorf("%s: expected %d old replica sets, got %d", test.name, len(test.expectedOld), len(test.oldRSs)) + continue + } + for n := range test.oldRSs { + rs := test.oldRSs[n] + exp := test.expectedOld[n] + if exp.Spec.Replicas != rs.Spec.Replicas { + t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, exp.Spec.Replicas, rs.Spec.Replicas) + } + } + } +} + func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { tests := []struct { deploymentReplicas int @@ -188,8 +449,8 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { for i, test := range tests { t.Logf("executing scenario %d", i) - newRS := rs("foo-v2", test.newReplicas, nil) - oldRS := rs("foo-v2", test.oldReplicas, nil) + newRS := rs("foo-v2", test.newReplicas, nil, noTimestamp) + oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) allRSs := []*exp.ReplicaSet{newRS, oldRS} deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil) fake := fake.Clientset{} @@ -289,8 +550,8 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { newSelector := map[string]string{"foo": "new"} oldSelector := map[string]string{"foo": "old"} - newRS := rs("foo-new", test.newReplicas, newSelector) - oldRS := rs("foo-old", test.oldReplicas, oldSelector) + newRS := rs("foo-new", test.newReplicas, newSelector, noTimestamp) + oldRS := rs("foo-old", test.oldReplicas, oldSelector, noTimestamp) oldRSs := []*exp.ReplicaSet{oldRS} allRSs := []*exp.ReplicaSet{oldRS, newRS} @@ -429,7 +690,7 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { for i, test := range tests { t.Logf("executing scenario %d", i) - oldRS := rs("foo-v2", test.oldReplicas, nil) + oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) oldRSs := []*exp.ReplicaSet{oldRS} deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil) fakeClientset := fake.Clientset{} @@ -538,7 +799,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing for i, test := range tests { t.Logf("executing scenario %d", i) - oldRS := rs("foo-v2", test.oldReplicas, nil) + oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) allRSs := []*exp.ReplicaSet{oldRS} oldRSs := []*exp.ReplicaSet{oldRS} deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"}) @@ -610,7 +871,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing } } -func TestDeploymentController_cleanupOldReplicaSets(t *testing.T) { +func TestDeploymentController_cleanupDeployment(t *testing.T) { selector := map[string]string{"foo": "bar"} tests := []struct { @@ -669,7 +930,7 @@ func TestDeploymentController_cleanupOldReplicaSets(t *testing.T) { } d := newDeployment(1, &tests[i].revisionHistoryLimit) - controller.cleanupOldReplicaSets(test.oldRSs, d) + controller.cleanupDeployment(test.oldRSs, d) gotDeletions := 0 for _, action := range fake.Actions() { diff --git a/pkg/controller/deployment/util.go b/pkg/controller/deployment/util.go new file mode 100644 index 00000000000..b8c45c090de --- /dev/null +++ b/pkg/controller/deployment/util.go @@ -0,0 +1,161 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "sort" + "strconv" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/controller" + deploymentutil "k8s.io/kubernetes/pkg/util/deployment" + "k8s.io/kubernetes/pkg/util/integer" +) + +// findActiveOrLatest returns the only active or the latest replica set in case there is at most one active +// replica set. If there are more active replica sets, then we should proportionally scale them. +func findActiveOrLatest(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) *extensions.ReplicaSet { + if newRS == nil && len(oldRSs) == 0 { + return nil + } + + sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs))) + allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) + + switch len(allRSs) { + case 0: + // If there is no active replica set then we should return the newest. + if newRS != nil { + return newRS + } + return oldRSs[0] + case 1: + return allRSs[0] + default: + return nil + } +} + +func getDesiredReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) { + return getIntFromAnnotation(rs, deploymentutil.DesiredReplicasAnnotation) +} + +func getMaxReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) { + return getIntFromAnnotation(rs, deploymentutil.MaxReplicasAnnotation) +} + +func getIntFromAnnotation(rs *extensions.ReplicaSet, annotationKey string) (int32, bool) { + annotationValue, ok := rs.Annotations[annotationKey] + if !ok { + return int32(0), false + } + intValue, err := strconv.Atoi(annotationValue) + if err != nil { + glog.Warningf("Cannot convert the value %q with annotation key %q for the replica set %q", + annotationValue, annotationKey, rs.Name) + return int32(0), false + } + return int32(intValue), true +} + +func setReplicasAnnotations(rs *extensions.ReplicaSet, desiredReplicas, maxReplicas int32) bool { + updated := false + if rs.Annotations == nil { + rs.Annotations = make(map[string]string) + } + desiredString := fmt.Sprintf("%d", desiredReplicas) + if hasString := rs.Annotations[deploymentutil.DesiredReplicasAnnotation]; hasString != desiredString { + rs.Annotations[deploymentutil.DesiredReplicasAnnotation] = desiredString + updated = true + } + maxString := fmt.Sprintf("%d", maxReplicas) + if hasString := rs.Annotations[deploymentutil.MaxReplicasAnnotation]; hasString != maxString { + rs.Annotations[deploymentutil.MaxReplicasAnnotation] = maxString + updated = true + } + return updated +} + +// maxUnavailable returns the maximum unavailable pods a rolling deployment can take. +func maxUnavailable(deployment extensions.Deployment) int32 { + if !deploymentutil.IsRollingUpdate(&deployment) { + return int32(0) + } + // Error caught by validation + _, maxUnavailable, _ := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas) + return maxUnavailable +} + +// maxSurge returns the maximum surge pods a rolling deployment can take. +func maxSurge(deployment extensions.Deployment) int32 { + if !deploymentutil.IsRollingUpdate(&deployment) { + return int32(0) + } + // Error caught by validation + maxSurge, _, _ := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas) + return maxSurge +} + +// getProportion will estimate the proportion for the provided replica set using 1. the current size +// of the parent deployment, 2. the replica count that needs be added on the replica sets of the +// deployment, and 3. the total replicas added in the replica sets of the deployment so far. +func getProportion(rs *extensions.ReplicaSet, d extensions.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 { + if rs == nil || rs.Spec.Replicas == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded { + return int32(0) + } + + rsFraction := getReplicaSetFraction(*rs, d) + allowed := deploymentReplicasToAdd - deploymentReplicasAdded + + if deploymentReplicasToAdd > 0 { + // Use the minimum between the replica set fraction and the maximum allowed replicas + // when scaling up. This way we ensure we will not scale up more than the allowed + // replicas we can add. + return integer.Int32Min(rsFraction, allowed) + } + // Use the maximum between the replica set fraction and the maximum allowed replicas + // when scaling down. This way we ensure we will not scale down more than the allowed + // replicas we can remove. + return integer.Int32Max(rsFraction, allowed) +} + +// getReplicaSetFraction estimates the fraction of replicas a replica set can have in +// 1. a scaling event during a rollout or 2. when scaling a paused deployment. +func getReplicaSetFraction(rs extensions.ReplicaSet, d extensions.Deployment) int32 { + // If we are scaling down to zero then the fraction of this replica set is its whole size (negative) + if d.Spec.Replicas == int32(0) { + return -rs.Spec.Replicas + } + + deploymentReplicas := d.Spec.Replicas + maxSurge(d) + annotatedReplicas, ok := getMaxReplicasAnnotation(&rs) + if !ok { + // If we cannot find the annotation then fallback to the current deployment size. Note that this + // will not be an accurate proportion estimation in case other replica sets have different values + // which means that the deployment was scaled at some point but we at least will stay in limits + // due to the min-max comparisons in getProportion. + annotatedReplicas = d.Status.Replicas + } + + // We should never proportionally scale up from zero which means rs.spec.replicas and annotatedReplicas + // will never be zero here. + newRSsize := (float64(rs.Spec.Replicas * deploymentReplicas)) / float64(annotatedReplicas) + return integer.RoundToInt32(newRSsize) - rs.Spec.Replicas +} diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 0442da42901..05b2a83df46 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -41,6 +41,14 @@ import ( const ( // The revision annotation of a deployment's replica sets which records its rollout sequence RevisionAnnotation = "deployment.kubernetes.io/revision" + // DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation + // in its replica sets. Helps in separating scaling events from the rollout process and for + // determining if the new replica set for a deployment is really saturated. + DesiredReplicasAnnotation = "deployment.kubernetes.io/desired-replicas" + // MaxReplicasAnnotation is the maximum replicas a deployment can have at a given point, which + // is deployment.spec.replicas + maxSurge. Used by the underlying replica sets to estimate their + // proportions in case the deployment has surge replicas. + MaxReplicasAnnotation = "deployment.kubernetes.io/max-replicas" // Here are the possible rollback event reasons RollbackRevisionNotFound = "DeploymentRollbackRevisionNotFound" @@ -434,6 +442,21 @@ func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.Re } } +// IsSaturated checks if the new replica set is saturated by comparing its size with its deployment size. +// Both the deployment and the replica set have to believe this replica set can own all of the desired +// replicas in the deployment and the annotation helps in achieving that. +func IsSaturated(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool { + if rs == nil { + return false + } + desiredString := rs.Annotations[DesiredReplicasAnnotation] + desired, err := strconv.Atoi(desiredString) + if err != nil { + return false + } + return rs.Spec.Replicas == deployment.Spec.Replicas && int32(desired) == deployment.Spec.Replicas +} + // Polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration. // Returns error if polling timesout. func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error { diff --git a/pkg/util/intstr/intstr.go b/pkg/util/intstr/intstr.go index 53bc0fc7e62..8f29f32c6fc 100644 --- a/pkg/util/intstr/intstr.go +++ b/pkg/util/intstr/intstr.go @@ -144,5 +144,5 @@ func getIntOrPercentValue(intOrStr *IntOrString) (int, bool, error) { } return int(v), true, nil } - return 0, false, fmt.Errorf("invalid value: neither int nor percentage") + return 0, false, fmt.Errorf("invalid type: neither int nor percentage") } diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index e3ce387e68a..d5f268ff103 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -80,6 +80,9 @@ var _ = framework.KubeDescribe("Deployment", func() { It("deployment should label adopted RSs and pods", func() { testDeploymentLabelAdopted(f) }) + It("paused deployment should be able to scale", func() { + testScalePausedDeployment(f) + }) }) func newRS(rsName string, replicas int32, rsPodLabels map[string]string, imageName string, image string) *extensions.ReplicaSet { @@ -569,6 +572,8 @@ func testPausedDeployment(f *framework.Framework) { podLabels := map[string]string{"name": nginxImageName} d := newDeployment(deploymentName, 1, podLabels, nginxImageName, nginxImage, extensions.RollingUpdateDeploymentStrategyType, nil) d.Spec.Paused = true + tgps := int64(20) + d.Spec.Template.Spec.TerminationGracePeriodSeconds = &tgps framework.Logf("Creating paused deployment %s", deploymentName) _, err := c.Extensions().Deployments(ns).Create(d) Expect(err).NotTo(HaveOccurred()) @@ -622,21 +627,34 @@ func testPausedDeployment(f *framework.Framework) { err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation) Expect(err).NotTo(HaveOccurred()) + // Update the deployment template - the new replicaset should stay the same + framework.Logf("Updating paused deployment %q", deploymentName) + newTGPS := int64(40) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, d.Name, func(update *extensions.Deployment) { + update.Spec.Template.Spec.TerminationGracePeriodSeconds = &newTGPS + }) + Expect(err).NotTo(HaveOccurred()) + + err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation) + Expect(err).NotTo(HaveOccurred()) + + framework.Logf("Looking for new replicaset for paused deployment %q (there should be none)", deploymentName) newRS, err := deploymentutil.GetNewReplicaSet(deployment, c) Expect(err).NotTo(HaveOccurred()) - Expect(framework.DeleteReplicaSet(unversionedClient, ns, newRS.Name)).NotTo(HaveOccurred()) - - deployment, err = c.Extensions().Deployments(ns).Get(deploymentName) - Expect(err).NotTo(HaveOccurred()) - - if !deployment.Spec.Paused { - err = fmt.Errorf("deployment %q should be paused", deployment.Name) + if newRS != nil { + err = fmt.Errorf("No replica set should match the deployment template but there is %q", newRS.Name) Expect(err).NotTo(HaveOccurred()) } - shouldBeNil, err := deploymentutil.GetNewReplicaSet(deployment, c) + + _, allOldRs, err := deploymentutil.GetOldReplicaSets(deployment, c) Expect(err).NotTo(HaveOccurred()) - if shouldBeNil != nil { - err = fmt.Errorf("deployment %q shouldn't have a replica set but there is %q", deployment.Name, shouldBeNil.Name) + if len(allOldRs) != 1 { + err = fmt.Errorf("expected an old replica set") + Expect(err).NotTo(HaveOccurred()) + } + framework.Logf("Comparing deployment diff with old replica set %q", allOldRs[0].Name) + if *allOldRs[0].Spec.Template.Spec.TerminationGracePeriodSeconds == newTGPS { + err = fmt.Errorf("TerminationGracePeriodSeconds on the replica set should be %d but is %d", tgps, newTGPS) Expect(err).NotTo(HaveOccurred()) } } @@ -944,3 +962,54 @@ func testDeploymentLabelAdopted(f *framework.Framework) { Expect(err).NotTo(HaveOccurred()) Expect(int32(len(pods.Items))).Should(Equal(replicas)) } + +func testScalePausedDeployment(f *framework.Framework) { + ns := f.Namespace.Name + c := adapter.FromUnversionedClient(f.Client) + + podLabels := map[string]string{"name": nginxImageName} + replicas := int32(3) + + // Create a nginx deployment. + deploymentName := "nginx-deployment" + d := newDeployment(deploymentName, replicas, podLabels, nginxImageName, nginxImage, extensions.RollingUpdateDeploymentStrategyType, nil) + framework.Logf("Creating deployment %q", deploymentName) + _, err := c.Extensions().Deployments(ns).Create(d) + Expect(err).NotTo(HaveOccurred()) + defer stopDeployment(c, f.Client, ns, deploymentName) + + // Check that deployment is created fine. + deployment, err := c.Extensions().Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) + + err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation) + Expect(err).NotTo(HaveOccurred()) + + rs, err := deploymentutil.GetNewReplicaSet(deployment, c) + Expect(err).NotTo(HaveOccurred()) + + // Pause the deployment and try to scale it. + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, d.Name, func(update *extensions.Deployment) { + update.Spec.Paused = true + }) + Expect(err).NotTo(HaveOccurred()) + + // Scale the paused deployment. + framework.Logf("Scaling up the paused deployment %q", deploymentName) + newReplicas := int32(5) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) { + update.Spec.Replicas = newReplicas + }) + Expect(err).NotTo(HaveOccurred()) + + err = framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation) + Expect(err).NotTo(HaveOccurred()) + + rs, err = deploymentutil.GetNewReplicaSet(deployment, c) + Expect(err).NotTo(HaveOccurred()) + + if rs.Spec.Replicas != newReplicas { + err = fmt.Errorf("Expected %d replicas for the new replica set, got %d", newReplicas, rs.Spec.Replicas) + Expect(err).NotTo(HaveOccurred()) + } +}