From a5029bf37373343ce1e9c239435b223f033255e9 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Thu, 15 Sep 2016 17:57:53 +0200 Subject: [PATCH 1/3] controller: support perma-failed deployments This commit adds support for failing deployments based on a timeout parameter defined in the spec. If there is no progress for the amount of time defined as progressDeadlineSeconds then the deployment will be marked as failed by adding a condition with a ProgressDeadlineExceeded reason in it. Progress in the context of a deployment means the creation or adoption of a new replica set, scaling up new pods, and scaling down old pods. --- pkg/controller/controller_utils.go | 24 +- pkg/controller/deployment/BUILD | 1 + .../deployment/deployment_controller.go | 15 + .../deployment/deployment_controller_test.go | 8 - pkg/controller/deployment/progress.go | 188 +++++++++ pkg/controller/deployment/recreate.go | 6 +- pkg/controller/deployment/rolling.go | 6 +- pkg/controller/deployment/sync.go | 83 +++- .../deployment/util/deployment_util.go | 153 +++++++ .../deployment/util/deployment_util_test.go | 375 +++++++++++++++++- 10 files changed, 834 insertions(+), 25 deletions(-) create mode 100644 pkg/controller/deployment/progress.go diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index df5b6457621..2ce713e0041 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -343,6 +343,22 @@ func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *U return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)} } +// Reasons for pod events +const ( + // FailedCreatePodReason is added in an event and in a replica set condition + // when a pod for a replica set is failed to be created. + FailedCreatePodReason = "FailedCreate" + // SuccessfulCreatePodReason is added in an event when a pod for a replica set + // is successfully created. + SuccessfulCreatePodReason = "SuccessfulCreate" + // FailedDeletePodReason is added in an event and in a replica set condition + // when a pod for a replica set is failed to be deleted. + FailedDeletePodReason = "FailedDelete" + // SuccessfulDeletePodReason is added in an event when a pod for a replica set + // is successfully deleted. + SuccessfulDeletePodReason = "SuccessfulDelete" +) + // PodControlInterface is an interface that knows how to add or delete pods // created as an interface to allow testing. type PodControlInterface interface { @@ -485,7 +501,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod return fmt.Errorf("unable to create pods, no labels") } if newPod, err := r.KubeClient.Core().Pods(namespace).Create(pod); err != nil { - r.Recorder.Eventf(object, api.EventTypeWarning, "FailedCreate", "Error creating: %v", err) + r.Recorder.Eventf(object, api.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err) return fmt.Errorf("unable to create pods: %v", err) } else { accessor, err := meta.Accessor(object) @@ -494,7 +510,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod return nil } glog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name) - r.Recorder.Eventf(object, api.EventTypeNormal, "SuccessfulCreate", "Created pod: %v", newPod.Name) + r.Recorder.Eventf(object, api.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name) } return nil } @@ -505,11 +521,11 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime return fmt.Errorf("object does not have ObjectMeta, %v", err) } if err := r.KubeClient.Core().Pods(namespace).Delete(podID, nil); err != nil { - r.Recorder.Eventf(object, api.EventTypeWarning, "FailedDelete", "Error deleting: %v", err) + r.Recorder.Eventf(object, api.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) return fmt.Errorf("unable to delete pods: %v", err) } else { glog.V(4).Infof("Controller %v deleted pod %v", accessor.GetName(), podID) - r.Recorder.Eventf(object, api.EventTypeNormal, "SuccessfulDelete", "Deleted pod: %v", podID) + r.Recorder.Eventf(object, api.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID) } return nil } diff --git a/pkg/controller/deployment/BUILD b/pkg/controller/deployment/BUILD index 508df1fda44..c50ebd0d620 100644 --- a/pkg/controller/deployment/BUILD +++ b/pkg/controller/deployment/BUILD @@ -14,6 +14,7 @@ go_library( name = "go_default_library", srcs = [ "deployment_controller.go", + "progress.go", "recreate.go", "rollback.go", "rolling.go", diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 528858ed883..f51a53c99f6 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -350,6 +350,21 @@ func (dc *DeploymentController) syncDeployment(key string) error { return nil } + // Update deployment conditions with an Unknown condition when pausing/resuming + // a deployment. In this way, we can be sure that we won't timeout when a user + // resumes a Deployment with a set progressDeadlineSeconds. + if err = dc.checkPausedConditions(d); err != nil { + return err + } + + _, err = dc.hasFailed(d) + if err != nil { + return err + } + // TODO: Automatically rollback here if we failed above. Locate the last complete + // revision and populate the rollback spec with it. + // See https://github.com/kubernetes/kubernetes/issues/23211. + if d.Spec.Paused { return dc.sync(d) } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 7272055e7d6..03f40b13e5e 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -152,14 +152,6 @@ func (f *fixture) expectCreateRSAction(rs *extensions.ReplicaSet) { f.actions = append(f.actions, core.NewCreateAction(unversioned.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs)) } -func (f *fixture) expectUpdateRSAction(rs *extensions.ReplicaSet) { - f.actions = append(f.actions, core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs)) -} - -func (f *fixture) expectListPodAction(namespace string, opt api.ListOptions) { - f.actions = append(f.actions, core.NewListAction(unversioned.GroupVersionResource{Resource: "pods"}, namespace, opt)) -} - func newFixture(t *testing.T) *fixture { f := &fixture{} f.t = t diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go new file mode 100644 index 00000000000..0d915e62e6b --- /dev/null +++ b/pkg/controller/deployment/progress.go @@ -0,0 +1,188 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "reflect" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/controller/deployment/util" +) + +// hasFailed determines if a deployment has failed or not by estimating its progress. +// Progress for a deployment is considered when a new replica set is created or adopted, +// and when new pods scale up or old pods scale down. Progress is not estimated for paused +// deployments or when users don't really care about it ie. progressDeadlineSeconds is not +// specified. +func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error) { + if d.Spec.ProgressDeadlineSeconds == nil || d.Spec.RollbackTo != nil || d.Spec.Paused { + return false, nil + } + + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) + if err != nil { + return false, err + } + + // There is a template change so we don't need to check for any progress right now. + if newRS == nil { + return false, nil + } + + // Look at the status of the deployment - if there is already a NewRSAvailableReason + // then we don't need to estimate any progress. This is needed in order to avoid + // estimating progress for scaling events after a rollout has finished. + cond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing) + if cond != nil && cond.Reason == util.NewRSAvailableReason { + return false, nil + } + + // TODO: Look for permanent failures here. + // See https://github.com/kubernetes/kubernetes/issues/18568 + + allRSs := append(oldRSs, newRS) + newStatus := dc.calculateStatus(allRSs, newRS, d) + + // If the deployment is complete or it is progressing, there is no need to check if it + // has timed out. + if util.DeploymentComplete(d, &newStatus) || util.DeploymentProgressing(d, &newStatus) { + return false, nil + } + + // Check if the deployment has timed out. + return util.DeploymentTimedOut(d, &newStatus), nil +} + +// syncRolloutStatus updates the status of a deployment during a rollout. There are +// cases this helper will run that cannot be prevented from the scaling detection, +// for example a resync of the deployment after it was scaled up. In those cases, +// we shouldn't try to estimate any progress. +func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { + newStatus := dc.calculateStatus(allRSs, newRS, d) + + // If there is no progressDeadlineSeconds set, remove any Progressing condition. + if d.Spec.ProgressDeadlineSeconds == nil { + util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentProgressing) + } + + // If there is only one replica set that is active then that means we are not running + // a new rollout and this is a resync where we don't need to estimate any progress. + // In such a case, we should simply not estimate any progress for this deployment. + currentCond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing) + isResyncEvent := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason + // Check for progress only if there is a progress deadline set and the latest rollout + // hasn't completed yet. + if d.Spec.ProgressDeadlineSeconds != nil && !isResyncEvent { + switch { + case util.DeploymentComplete(d, &newStatus): + // Update the deployment conditions with a message for the new replica set that + // was successfully deployed. If the condition already exists, we ignore this update. + msg := fmt.Sprintf("Replica set %q has successfully progressed.", newRS.Name) + condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, util.NewRSAvailableReason, msg) + util.SetDeploymentCondition(&newStatus, *condition) + + case util.DeploymentProgressing(d, &newStatus): + // If there is any progress made, continue by not checking if the deployment failed. This + // behavior emulates the rolling updater progressDeadline check. + msg := fmt.Sprintf("Replica set %q is progressing.", newRS.Name) + condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, util.ReplicaSetUpdatedReason, msg) + // Update the current Progressing condition or add a new one if it doesn't exist. + // If a Progressing condition with status=true already exists, we should update + // everything but lastTransitionTime. SetDeploymentCondition already does that but + // it also is not updating conditions when the reason of the new condition is the + // same as the old. The Progressing condition is a special case because we want to + // update with the same reason and change just lastUpdateTime iff we notice any + // progress. That's why we handle it here. + if currentCond != nil { + if currentCond.Status == api.ConditionTrue { + condition.LastTransitionTime = currentCond.LastTransitionTime + } + util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentProgressing) + } + util.SetDeploymentCondition(&newStatus, *condition) + + case util.DeploymentTimedOut(d, &newStatus): + // Update the deployment with a timeout condition. If the condition already exists, + // we ignore this update. + msg := fmt.Sprintf("Replica set %q has timed out progressing.", newRS.Name) + condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionFalse, util.TimedOutReason, msg) + util.SetDeploymentCondition(&newStatus, *condition) + } + } + + // Move failure conditions of all replica sets in deployment conditions. For now, + // only one failure condition is returned from getReplicaFailures. + if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 { + // There will be only one ReplicaFailure condition on the replica set. + util.SetDeploymentCondition(&newStatus, replicaFailureCond[0]) + } else { + util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentReplicaFailure) + } + + // Do not update if there is nothing new to add. + if reflect.DeepEqual(d.Status, newStatus) { + // TODO: If there is no sign of progress at this point then there is a high chance that the + // deployment is stuck. We should resync this deployment at some point[1] in the future[2] and + // check if it has timed out. We definitely need this, otherwise we depend on the controller + // resync interval. See https://github.com/kubernetes/kubernetes/issues/34458. + // + // [1] time.Now() + progressDeadlineSeconds - lastUpdateTime (of the Progressing condition). + // [2] Use dc.queue.AddAfter + return nil + } + + newDeployment := d + newDeployment.Status = newStatus + _, err := dc.client.Extensions().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment) + return err +} + +// getReplicaFailures will convert replica failure conditions from replica sets +// to deployment conditions. +func (dc *DeploymentController) getReplicaFailures(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) []extensions.DeploymentCondition { + var conditions []extensions.DeploymentCondition + if newRS != nil { + for _, c := range newRS.Status.Conditions { + if c.Type != extensions.ReplicaSetReplicaFailure { + continue + } + conditions = append(conditions, util.ReplicaSetToDeploymentCondition(c)) + } + } + + // Return failures for the new replica set over failures from old replica sets. + if len(conditions) > 0 { + return conditions + } + + for i := range allRSs { + rs := allRSs[i] + if rs == nil { + continue + } + + for _, c := range rs.Status.Conditions { + if c.Type != extensions.ReplicaSetReplicaFailure { + continue + } + conditions = append(conditions, util.ReplicaSetToDeploymentCondition(c)) + } + } + return conditions +} diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index a5214c2d7dc..5255e930c85 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -42,7 +42,7 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen } if scaledDown { // Update DeploymentStatus - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } // Wait for all old replica set to scale down to zero. @@ -67,13 +67,13 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen } if scaledUp { // Update DeploymentStatus - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } dc.cleanupDeployment(oldRSs, deployment) // Sync deployment status - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } // scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate" diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index 3a3e18f5491..47d7d6f623f 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -42,7 +42,7 @@ func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment } if scaledUp { // Update DeploymentStatus - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } // Scale down, if we can. @@ -52,13 +52,13 @@ func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment } if scaledDown { // Update DeploymentStatus - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } dc.cleanupDeployment(oldRSs, deployment) // Sync deployment status - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 73a8e51ba98..3880b5470ba 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -64,6 +64,40 @@ func (dc *DeploymentController) sync(deployment *extensions.Deployment) error { return dc.syncDeploymentStatus(allRSs, newRS, deployment) } +// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition. +// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments +// that were paused for longer than progressDeadlineSeconds. +func (dc *DeploymentController) checkPausedConditions(d *extensions.Deployment) error { + if d.Spec.ProgressDeadlineSeconds == nil { + return nil + } + cond := deploymentutil.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing) + if cond != nil && cond.Reason == deploymentutil.TimedOutReason { + // If we have reported lack of progress, do not overwrite it with a paused condition. + return nil + } + pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason + + needsUpdate := false + if d.Spec.Paused && !pausedCondExists { + condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused") + deploymentutil.SetDeploymentCondition(&d.Status, *condition) + needsUpdate = true + } else if !d.Spec.Paused && pausedCondExists { + condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed") + deploymentutil.SetDeploymentCondition(&d.Status, *condition) + needsUpdate = true + } + + if !needsUpdate { + return nil + } + + var err error + d, err = dc.client.Extensions().Deployments(d.Namespace).UpdateStatus(d) + return err +} + // getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated. // 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV). // 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1), @@ -267,6 +301,16 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme } updateConditions := deploymentutil.SetDeploymentRevision(deployment, newRevision) + // If no other Progressing condition has been recorded and we need to estimate the progress + // of this deployment then it is likely that old users started caring about progress. In that + // case we need to take into account the first time we noticed their new replica set. + cond := deploymentutil.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing) + if deployment.Spec.ProgressDeadlineSeconds != nil && cond == nil { + msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name) + condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, deploymentutil.FoundNewRSReason, msg) + deploymentutil.SetDeploymentCondition(&deployment.Status, *condition) + updateConditions = true + } if updateConditions { if deployment, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment); err != nil { @@ -311,14 +355,36 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme // Set new replica set's annotation deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) - if err != nil { - return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err) + switch { + // We may end up hitting this due to a slow cache or a fast resync of the deployment. + case errors.IsAlreadyExists(err): + return dc.rsLister.ReplicaSets(namespace).Get(newRS.Name) + case err != nil: + msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err) + if deployment.Spec.ProgressDeadlineSeconds != nil { + cond := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionFalse, deploymentutil.FailedRSCreateReason, msg) + deploymentutil.SetDeploymentCondition(&deployment.Status, *cond) + // We don't really care about this error at this point, since we have a bigger issue to report. + // TODO: Update the rest of the Deployment status, too. We may need to do this every time we + // error out in all other places in the controller so that we let users know that their deployments + // have been noticed by the controller, albeit with errors. + // TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account + // these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568 + _, _ = dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(deployment) + } + dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg) + return nil, err } if newReplicasCount > 0 { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount) + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Created new replica set %q and scaled up to %d", createdRS.Name, newReplicasCount) } deploymentutil.SetDeploymentRevision(deployment, newRevision) + if deployment.Spec.ProgressDeadlineSeconds != nil { + msg := fmt.Sprintf("Created new replica set %q", createdRS.Name) + condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, deploymentutil.NewReplicaSetReason, msg) + deploymentutil.SetDeploymentCondition(&deployment.Status, *condition) + } _, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment) return createdRS, err } @@ -442,7 +508,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newSc deploymentutil.SetReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment)) rs, err := dc.client.Extensions().ReplicaSets(rs.Namespace).Update(rs) if err == nil { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %q to %d", scalingOperation, rs.Name, newScale) } return rs, err } @@ -496,6 +562,14 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs) totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) + if availableReplicas >= deployment.Spec.Replicas-deploymentutil.MaxUnavailable(*deployment) { + minAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, api.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.") + deploymentutil.SetDeploymentCondition(&deployment.Status, *minAvailability) + } else { + noMinAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, api.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.") + deploymentutil.SetDeploymentCondition(&deployment.Status, *noMinAvailability) + } + return extensions.DeploymentStatus{ // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. ObservedGeneration: deployment.Generation, @@ -503,6 +577,7 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}), AvailableReplicas: availableReplicas, UnavailableReplicas: totalReplicas - availableReplicas, + Conditions: deployment.Status.Conditions, } } diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 3502e77d8f7..66f1c3f8d37 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -70,8 +70,111 @@ const ( // TODO: Delete this annotation when we gracefully handle overlapping selectors. // See https://github.com/kubernetes/kubernetes/issues/2210 SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at" + + // Reasons for deployment conditions + // + // Progressing: + // + // ReplicaSetUpdatedReason is added in a deployment when one of its replica sets is updated as part + // of the rollout process. + ReplicaSetUpdatedReason = "ReplicaSetUpdated" + // FailedRSCreateReason is added in a deployment when it cannot create a new replica set. + FailedRSCreateReason = "ReplicaSetCreateError" + // NewReplicaSetReason is added in a deployment when it creates a new replica set. + NewReplicaSetReason = "NewReplicaSetCreated" + // FoundNewRSReason is added in a deployment when it adopts an existing replica set. + FoundNewRSReason = "FoundNewReplicaSet" + // NewRSAvailableReason is added in a deployment when its newest replica set is made available + // ie. the number of new pods that have passed readiness checks and run for at least minReadySeconds + // is at least the minimum available pods that need to run for the deployment. + NewRSAvailableReason = "NewReplicaSetAvailable" + // TimedOutReason is added in a deployment when its newest replica set fails to show any progress + // within the given deadline (progressDeadlineSeconds). + TimedOutReason = "ProgressDeadlineExceeded" + // PausedDeployReason is added in a deployment when it is paused. Lack of progress shouldn't be + // estimated once a deployment is paused. + PausedDeployReason = "DeploymentPaused" + // ResumedDeployReason is added in a deployment when it is resumed. Useful for not failing accidentally + // deployments that paused amidst a rollout and are bounded by a deadline. + ResumedDeployReason = "DeploymentResumed" + // + // Available: + // + // MinimumReplicasAvailable is added in a deployment when it has its minimum replicas required available. + MinimumReplicasAvailable = "MinimumReplicasAvailable" + // MinimumReplicasUnavailable is added in a deployment when it doesn't have the minimum required replicas + // available. + MinimumReplicasUnavailable = "MinimumReplicasUnavailable" ) +// NewDeploymentCondition creates a new deployment condition. +func NewDeploymentCondition(condType extensions.DeploymentConditionType, status api.ConditionStatus, reason, message string) *extensions.DeploymentCondition { + return &extensions.DeploymentCondition{ + Type: condType, + Status: status, + LastUpdateTime: unversioned.Now(), + LastTransitionTime: unversioned.Now(), + Reason: reason, + Message: message, + } +} + +// GetDeploymentCondition returns the condition with the provided type. +func GetDeploymentCondition(status extensions.DeploymentStatus, condType extensions.DeploymentConditionType) *extensions.DeploymentCondition { + for i := range status.Conditions { + c := status.Conditions[i] + if c.Type == condType { + return &c + } + } + return nil +} + +// SetDeploymentCondition updates the deployment to include the provided condition. If the condition that +// we are about to add already exists and has the same status and reason then we are not going to update. +func SetDeploymentCondition(status *extensions.DeploymentStatus, condition extensions.DeploymentCondition) { + currentCond := GetDeploymentCondition(*status, condition.Type) + if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason { + return + } + // Do not update lastTransitionTime if the status of the condition doesn't change. + if currentCond != nil && currentCond.Status == condition.Status { + condition.LastTransitionTime = currentCond.LastTransitionTime + } + newConditions := filterOutCondition(status.Conditions, condition.Type) + status.Conditions = append(newConditions, condition) +} + +// RemoveDeploymentCondition removes the deployment condition with the provided type. +func RemoveDeploymentCondition(status *extensions.DeploymentStatus, condType extensions.DeploymentConditionType) { + status.Conditions = filterOutCondition(status.Conditions, condType) +} + +// filterOutCondition returns a new slice of deployment conditions without conditions with the provided type. +func filterOutCondition(conditions []extensions.DeploymentCondition, condType extensions.DeploymentConditionType) []extensions.DeploymentCondition { + var newConditions []extensions.DeploymentCondition + for _, c := range conditions { + if c.Type == condType { + continue + } + newConditions = append(newConditions, c) + } + return newConditions +} + +// ReplicaSetToDeploymentCondition converts a replica set condition into a deployment condition. +// Useful for promoting replica set failure conditions into deployments. +func ReplicaSetToDeploymentCondition(cond extensions.ReplicaSetCondition) extensions.DeploymentCondition { + return extensions.DeploymentCondition{ + Type: extensions.DeploymentConditionType(cond.Type), + Status: cond.Status, + LastTransitionTime: cond.LastTransitionTime, + LastUpdateTime: cond.LastTransitionTime, + Reason: cond.Reason, + Message: cond.Message, + } +} + // SetDeploymentRevision updates the revision for a deployment. func SetDeploymentRevision(deployment *extensions.Deployment, revision string) bool { updated := false @@ -696,6 +799,56 @@ func IsRollingUpdate(deployment *extensions.Deployment) bool { return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType } +// DeploymentComplete considers a deployment to be complete once its desired replicas equals its +// updatedReplicas and it doesn't violate minimum availability. +func DeploymentComplete(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool { + return newStatus.UpdatedReplicas == deployment.Spec.Replicas && + newStatus.AvailableReplicas >= deployment.Spec.Replicas-MaxUnavailable(*deployment) +} + +// DeploymentProgressing reports progress for a deployment. Progress is estimated by comparing the +// current with the new status of the deployment that the controller is observing. The following +// algorithm is already used in the kubectl rolling updater to report lack of progress. +func DeploymentProgressing(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool { + oldStatus := deployment.Status + + // Old replicas that need to be scaled down + oldStatusOldReplicas := oldStatus.Replicas - oldStatus.UpdatedReplicas + newStatusOldReplicas := newStatus.Replicas - newStatus.UpdatedReplicas + + return (newStatus.UpdatedReplicas > oldStatus.UpdatedReplicas) || (newStatusOldReplicas < oldStatusOldReplicas) +} + +// used for unit testing +var nowFn = func() time.Time { return time.Now() } + +// DeploymentTimedOut considers a deployment to have timed out once its condition that reports progress +// is older than progressDeadlineSeconds or a Progressing condition with a TimedOutReason reason already +// exists. +func DeploymentTimedOut(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool { + if deployment.Spec.ProgressDeadlineSeconds == nil { + return false + } + + // Look for the Progressing condition. If it doesn't exist, we have no base to estimate progress. + // If it's already set with a TimedOutReason reason, we have already timed out, no need to check + // again. + condition := GetDeploymentCondition(*newStatus, extensions.DeploymentProgressing) + if condition == nil { + return false + } + if condition.Reason == TimedOutReason { + return true + } + + // Look at the difference in seconds between now and the last time we reported any + // progress or tried to create a replica set, or resumed a paused deployment and + // compare against progressDeadlineSeconds. + from := condition.LastTransitionTime + delta := time.Duration(*deployment.Spec.ProgressDeadlineSeconds) * time.Second + return from.Add(delta).Before(nowFn()) +} + // NewRSNewReplicas calculates the number of replicas a deployment's new RS should have. // When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it. // 1) The new RS is saturated: newRS's replicas == deployment's replicas diff --git a/pkg/controller/deployment/util/deployment_util_test.go b/pkg/controller/deployment/util/deployment_util_test.go index 2d85bea2847..96fb29431a4 100644 --- a/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -688,7 +688,6 @@ func TestResolveFenceposts(t *testing.T) { } func TestNewRSNewReplicas(t *testing.T) { - tests := []struct { test string strategyType extensions.DeploymentStrategyType @@ -703,12 +702,12 @@ func TestNewRSNewReplicas(t *testing.T) { 1, 5, 1, 5, }, { - "scale up - to depDeplicas", + "scale up - to depReplicas", extensions.RollingUpdateDeploymentStrategyType, 6, 2, 10, 6, }, { - "recreate - to depDeplicas", + "recreate - to depReplicas", extensions.RecreateDeploymentStrategyType, 3, 1, 1, 3, }, @@ -735,3 +734,373 @@ func TestNewRSNewReplicas(t *testing.T) { } } } + +var ( + condProgressing = func() extensions.DeploymentCondition { + return extensions.DeploymentCondition{ + Type: extensions.DeploymentProgressing, + Status: api.ConditionFalse, + Reason: "ForSomeReason", + } + } + + condProgressing2 = func() extensions.DeploymentCondition { + return extensions.DeploymentCondition{ + Type: extensions.DeploymentProgressing, + Status: api.ConditionTrue, + Reason: "BecauseItIs", + } + } + + condAvailable = func() extensions.DeploymentCondition { + return extensions.DeploymentCondition{ + Type: extensions.DeploymentAvailable, + Status: api.ConditionTrue, + Reason: "AwesomeController", + } + } + + status = func() *extensions.DeploymentStatus { + return &extensions.DeploymentStatus{ + Conditions: []extensions.DeploymentCondition{condProgressing(), condAvailable()}, + } + } +) + +func TestGetCondition(t *testing.T) { + exampleStatus := status() + + tests := []struct { + name string + + status extensions.DeploymentStatus + condType extensions.DeploymentConditionType + condStatus api.ConditionStatus + condReason string + + expected bool + }{ + { + name: "condition exists", + + status: *exampleStatus, + condType: extensions.DeploymentAvailable, + + expected: true, + }, + { + name: "condition does not exist", + + status: *exampleStatus, + condType: extensions.DeploymentReplicaFailure, + + expected: false, + }, + } + + for _, test := range tests { + cond := GetDeploymentCondition(test.status, test.condType) + exists := cond != nil + if exists != test.expected { + t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists) + } + } +} + +func TestSetCondition(t *testing.T) { + tests := []struct { + name string + + status *extensions.DeploymentStatus + cond extensions.DeploymentCondition + + expectedStatus *extensions.DeploymentStatus + }{ + { + name: "set for the first time", + + status: &extensions.DeploymentStatus{}, + cond: condAvailable(), + + expectedStatus: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condAvailable()}}, + }, + { + name: "simple set", + + status: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condProgressing()}}, + cond: condAvailable(), + + expectedStatus: status(), + }, + { + name: "overwrite", + + status: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condProgressing()}}, + cond: condProgressing2(), + + expectedStatus: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condProgressing2()}}, + }, + } + + for _, test := range tests { + SetDeploymentCondition(test.status, test.cond) + if !reflect.DeepEqual(test.status, test.expectedStatus) { + t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) + } + } +} + +func TestRemoveCondition(t *testing.T) { + tests := []struct { + name string + + status *extensions.DeploymentStatus + condType extensions.DeploymentConditionType + + expectedStatus *extensions.DeploymentStatus + }{ + { + name: "remove from empty status", + + status: &extensions.DeploymentStatus{}, + condType: extensions.DeploymentProgressing, + + expectedStatus: &extensions.DeploymentStatus{}, + }, + { + name: "simple remove", + + status: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condProgressing()}}, + condType: extensions.DeploymentProgressing, + + expectedStatus: &extensions.DeploymentStatus{}, + }, + { + name: "doesn't remove anything", + + status: status(), + condType: extensions.DeploymentReplicaFailure, + + expectedStatus: status(), + }, + } + + for _, test := range tests { + RemoveDeploymentCondition(test.status, test.condType) + if !reflect.DeepEqual(test.status, test.expectedStatus) { + t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) + } + } +} + +func TestDeploymentComplete(t *testing.T) { + deployment := func(desired, current, updated, available, maxUnavailable int32) *extensions.Deployment { + return &extensions.Deployment{ + Spec: extensions.DeploymentSpec{ + Replicas: desired, + Strategy: extensions.DeploymentStrategy{ + RollingUpdate: &extensions.RollingUpdateDeployment{ + MaxUnavailable: intstr.FromInt(int(maxUnavailable)), + }, + Type: extensions.RollingUpdateDeploymentStrategyType, + }, + }, + Status: extensions.DeploymentStatus{ + Replicas: current, + UpdatedReplicas: updated, + AvailableReplicas: available, + }, + } + } + + tests := []struct { + name string + + d *extensions.Deployment + + expected bool + }{ + { + name: "complete", + + d: deployment(5, 5, 5, 4, 1), + expected: true, + }, + { + name: "not complete", + + d: deployment(5, 5, 5, 3, 1), + expected: false, + }, + { + name: "complete #2", + + d: deployment(5, 5, 5, 5, 0), + expected: true, + }, + { + name: "not complete #2", + + d: deployment(5, 5, 4, 5, 0), + expected: false, + }, + } + + for _, test := range tests { + t.Log(test.name) + + if got, exp := DeploymentComplete(test.d, &test.d.Status), test.expected; got != exp { + t.Errorf("expected complete: %t, got: %t", exp, got) + } + } +} + +func TestDeploymentProgressing(t *testing.T) { + deployment := func(current, updated int32) *extensions.Deployment { + return &extensions.Deployment{ + Status: extensions.DeploymentStatus{ + Replicas: current, + UpdatedReplicas: updated, + }, + } + } + newStatus := func(current, updated int32) extensions.DeploymentStatus { + return extensions.DeploymentStatus{ + Replicas: current, + UpdatedReplicas: updated, + } + } + + tests := []struct { + name string + + d *extensions.Deployment + newStatus extensions.DeploymentStatus + + expected bool + }{ + { + name: "progressing", + + d: deployment(10, 4), + newStatus: newStatus(10, 6), + + expected: true, + }, + { + name: "not progressing", + + d: deployment(10, 4), + newStatus: newStatus(10, 4), + + expected: false, + }, + { + name: "progressing #2", + + d: deployment(10, 4), + newStatus: newStatus(8, 4), + + expected: true, + }, + { + name: "not progressing #2", + + d: deployment(10, 7), + newStatus: newStatus(10, 6), + + expected: false, + }, + { + name: "progressing #3", + + d: deployment(10, 4), + newStatus: newStatus(8, 8), + + expected: true, + }, + { + name: "not progressing #2", + + d: deployment(10, 7), + newStatus: newStatus(10, 7), + + expected: false, + }, + } + + for _, test := range tests { + t.Log(test.name) + + if got, exp := DeploymentProgressing(test.d, &test.newStatus), test.expected; got != exp { + t.Errorf("expected progressing: %t, got: %t", exp, got) + } + } +} + +func TestDeploymentTimedOut(t *testing.T) { + var ( + null *int32 + ten = int32(10) + ) + + timeFn := func(min, sec int) time.Time { + return time.Date(2016, 1, 1, 0, min, sec, 0, time.UTC) + } + deployment := func(condType extensions.DeploymentConditionType, status api.ConditionStatus, pds *int32, from time.Time) extensions.Deployment { + return extensions.Deployment{ + Spec: extensions.DeploymentSpec{ + ProgressDeadlineSeconds: pds, + }, + Status: extensions.DeploymentStatus{ + Conditions: []extensions.DeploymentCondition{ + { + Type: condType, + Status: status, + LastTransitionTime: unversioned.Time{Time: from}, + }, + }, + }, + } + } + + tests := []struct { + name string + + d extensions.Deployment + nowFn func() time.Time + + expected bool + }{ + { + name: "no progressDeadlineSeconds specified - no timeout", + + d: deployment(extensions.DeploymentProgressing, api.ConditionTrue, null, timeFn(1, 9)), + nowFn: func() time.Time { return timeFn(1, 20) }, + expected: false, + }, + { + name: "progressDeadlineSeconds: 10s, now - started => 00:01:20 - 00:01:09 => 11s", + + d: deployment(extensions.DeploymentProgressing, api.ConditionTrue, &ten, timeFn(1, 9)), + nowFn: func() time.Time { return timeFn(1, 20) }, + expected: true, + }, + { + name: "progressDeadlineSeconds: 10s, now - started => 00:01:20 - 00:01:11 => 9s", + + d: deployment(extensions.DeploymentProgressing, api.ConditionTrue, &ten, timeFn(1, 11)), + nowFn: func() time.Time { return timeFn(1, 20) }, + expected: false, + }, + } + + for _, test := range tests { + t.Log(test.name) + + nowFn = test.nowFn + if got, exp := DeploymentTimedOut(&test.d, &test.d.Status), test.expected; got != exp { + t.Errorf("expected timeout: %t, got: %t", exp, got) + } + } +} From de8214ad4d7bd7a269dc80ee526a1e35731803e5 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Thu, 15 Sep 2016 17:58:19 +0200 Subject: [PATCH 2/3] test: e2e tests for perma-failed deployments --- test/e2e/deployment.go | 185 ++++++++++++++++++++++++++++++++++++- test/e2e/framework/util.go | 17 ++++ test/test_owners.csv | 2 + 3 files changed, 200 insertions(+), 4 deletions(-) diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index f1d7677e30e..7c5cece3428 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -18,6 +18,7 @@ package e2e import ( "fmt" + "math/rand" "strings" "time" @@ -95,6 +96,12 @@ var _ = framework.KubeDescribe("Deployment", func() { It("overlapping deployment should not fight with each other", func() { testOverlappingDeployment(f) }) + It("lack of progress should be reported in the deployment status", func() { + testFailedDeployment(f) + }) + It("iterative rollouts should eventually progress", func() { + testIterativeDeployments(f) + }) // TODO: add tests that cover deployment.Spec.MinReadySeconds once we solved clock-skew issues // See https://github.com/kubernetes/kubernetes/issues/29229 }) @@ -418,8 +425,8 @@ func testRollingUpdateDeploymentEvents(f *framework.Framework) { newRS, err := deploymentutil.GetNewReplicaSet(deployment, c) Expect(err).NotTo(HaveOccurred()) Expect(newRS).NotTo(Equal(nil)) - Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 1", newRS.Name))) - Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName))) + Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Created new replica set %q and scaled up to 1", newRS.Name))) + Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %q to 0", rsName))) } func testRecreateDeployment(f *framework.Framework) { @@ -470,8 +477,8 @@ func testRecreateDeployment(f *framework.Framework) { newRS, err := deploymentutil.GetNewReplicaSet(deployment, c) Expect(err).NotTo(HaveOccurred()) Expect(newRS).NotTo(Equal(nil)) - Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName))) - Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 3", newRS.Name))) + Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %q to 0", rsName))) + Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Created new replica set %q and scaled up to 3", newRS.Name))) } // testDeploymentCleanUpPolicy tests that deployment supports cleanup policy @@ -1316,3 +1323,173 @@ func testOverlappingDeployment(f *framework.Framework) { err = framework.WaitForDeploymentRevisionAndImage(c, ns, deployOverlapping.Name, "2", redisImage) Expect(err).NotTo(HaveOccurred(), "The second deployment failed to update to revision 2") } + +func testFailedDeployment(f *framework.Framework) { + ns := f.Namespace.Name + c := f.ClientSet + + podLabels := map[string]string{"name": nginxImageName} + replicas := int32(1) + + // Create a nginx deployment. + deploymentName := "nginx" + nonExistentImage := "nginx:not-there" + thirty := int32(30) + d := newDeployment(deploymentName, replicas, podLabels, nginxImageName, nonExistentImage, extensions.RecreateDeploymentStrategyType, nil) + d.Spec.ProgressDeadlineSeconds = &thirty + + framework.Logf("Creating deployment %q with progressDeadlineSeconds set to %ds and a non-existent image", deploymentName, thirty) + deployment, err := c.Extensions().Deployments(ns).Create(d) + Expect(err).NotTo(HaveOccurred()) + + framework.Logf("Waiting for deployment %q to be observed by the controller", deploymentName) + Expect(framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)).NotTo(HaveOccurred()) + + framework.Logf("Waiting for deployment %q status", deploymentName) + Expect(framework.WaitForDeploymentStatus(c, deployment)).NotTo(HaveOccurred()) + + framework.Logf("Checking deployment %q for a timeout condition", deploymentName) + Expect(framework.WaitForDeploymentWithCondition(c, ns, deploymentName, deploymentutil.TimedOutReason, extensions.DeploymentProgressing)).NotTo(HaveOccurred()) + + framework.Logf("Updating deployment %q with a good image", deploymentName) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) { + update.Spec.Template.Spec.Containers[0].Image = nginxImage + }) + Expect(err).NotTo(HaveOccurred()) + + framework.Logf("Waiting for deployment %q to be observed by the controller", deploymentName) + Expect(framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)).NotTo(HaveOccurred()) + + framework.Logf("Waiting for deployment %q status", deploymentName) + Expect(framework.WaitForDeploymentStatus(c, deployment)).NotTo(HaveOccurred()) + + framework.Logf("Checking deployment %q for a complete condition", deploymentName) + Expect(framework.WaitForDeploymentWithCondition(c, ns, deploymentName, deploymentutil.NewRSAvailableReason, extensions.DeploymentProgressing)).NotTo(HaveOccurred()) +} + +func randomScale(d *extensions.Deployment, i int) { + switch r := rand.Float32(); { + case r < 0.3: + framework.Logf("%02d: scaling up", i) + d.Spec.Replicas++ + case r < 0.6: + if d.Spec.Replicas > 1 { + framework.Logf("%02d: scaling down", i) + d.Spec.Replicas-- + } + } +} + +func testIterativeDeployments(f *framework.Framework) { + ns := f.Namespace.Name + c := f.ClientSet + + podLabels := map[string]string{"name": nginxImageName} + replicas := int32(6) + zero := int64(0) + two := int32(2) + + // Create a nginx deployment. + deploymentName := "nginx" + thirty := int32(30) + d := newDeployment(deploymentName, replicas, podLabels, nginxImageName, nginxImage, extensions.RollingUpdateDeploymentStrategyType, nil) + d.Spec.ProgressDeadlineSeconds = &thirty + d.Spec.RevisionHistoryLimit = &two + d.Spec.Template.Spec.TerminationGracePeriodSeconds = &zero + framework.Logf("Creating deployment %q", deploymentName) + deployment, err := c.Extensions().Deployments(ns).Create(d) + Expect(err).NotTo(HaveOccurred()) + + iterations := 20 + for i := 0; i < iterations; i++ { + if r := rand.Float32(); r < 0.6 { + time.Sleep(time.Duration(float32(i) * r * float32(time.Second))) + } + + switch n := rand.Float32(); { + case n < 0.2: + // trigger a new deployment + framework.Logf("%02d: triggering a new rollout for deployment %q", i, deployment.Name) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) { + newEnv := api.EnvVar{Name: "A", Value: fmt.Sprintf("%d", i)} + update.Spec.Template.Spec.Containers[0].Env = append(update.Spec.Template.Spec.Containers[0].Env, newEnv) + randomScale(update, i) + }) + Expect(err).NotTo(HaveOccurred()) + + case n < 0.4: + // rollback to the previous version + framework.Logf("%02d: rolling back a rollout for deployment %q", i, deployment.Name) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) { + rollbackTo := &extensions.RollbackConfig{Revision: 0} + update.Spec.RollbackTo = rollbackTo + }) + Expect(err).NotTo(HaveOccurred()) + + case n < 0.6: + // just scaling + framework.Logf("%02d: scaling deployment %q", i, deployment.Name) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) { + randomScale(update, i) + }) + Expect(err).NotTo(HaveOccurred()) + + case n < 0.8: + // toggling the deployment + if deployment.Spec.Paused { + framework.Logf("%02d: pausing deployment %q", i, deployment.Name) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) { + update.Spec.Paused = true + randomScale(update, i) + }) + Expect(err).NotTo(HaveOccurred()) + } else { + framework.Logf("%02d: resuming deployment %q", i, deployment.Name) + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) { + update.Spec.Paused = false + randomScale(update, i) + }) + Expect(err).NotTo(HaveOccurred()) + } + + default: + // arbitrarily delete deployment pods + framework.Logf("%02d: arbitrarily deleting one or more deployment pods for deployment %q", i, deployment.Name) + selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) + Expect(err).NotTo(HaveOccurred()) + opts := api.ListOptions{LabelSelector: selector} + podList, err := c.Core().Pods(ns).List(opts) + Expect(err).NotTo(HaveOccurred()) + if len(podList.Items) == 0 { + framework.Logf("%02d: no deployment pods to delete", i) + continue + } + for p := range podList.Items { + if rand.Float32() < 0.5 { + continue + } + name := podList.Items[p].Name + framework.Logf("%02d: deleting deployment pod %q", i, name) + Expect(c.Core().Pods(ns).Delete(name, nil)).NotTo(HaveOccurred()) + } + } + } + + // unpause the deployment if we end up pausing it + deployment, err = c.Extensions().Deployments(ns).Get(deployment.Name) + Expect(err).NotTo(HaveOccurred()) + if deployment.Spec.Paused { + deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deployment.Name, func(update *extensions.Deployment) { + update.Spec.Paused = false + }) + } + + framework.Logf("Waiting for deployment %q to be observed by the controller", deploymentName) + Expect(framework.WaitForObservedDeployment(c, ns, deploymentName, deployment.Generation)).NotTo(HaveOccurred()) + + framework.Logf("Waiting for deployment %q status", deploymentName) + Expect(framework.WaitForDeploymentStatusValid(c, deployment)).NotTo(HaveOccurred()) + + framework.Logf("Checking deployment %q for a complete condition", deploymentName) + Expect(framework.WaitForDeploymentWithCondition(c, ns, deploymentName, deploymentutil.NewRSAvailableReason, extensions.DeploymentProgressing)).NotTo(HaveOccurred()) +} diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 2c98f78d617..2eea6f42f37 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -3173,6 +3173,23 @@ func WaitForObservedDeployment(c clientset.Interface, ns, deploymentName string, return deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { return c.Extensions().Deployments(ns).Get(deploymentName) }, desiredGeneration, Poll, 1*time.Minute) } +func WaitForDeploymentWithCondition(c clientset.Interface, ns, deploymentName, reason string, condType extensions.DeploymentConditionType) error { + var conditions []extensions.DeploymentCondition + pollErr := wait.PollImmediate(time.Second, 1*time.Minute, func() (bool, error) { + deployment, err := c.Extensions().Deployments(ns).Get(deploymentName) + if err != nil { + return false, err + } + conditions = deployment.Status.Conditions + cond := deploymentutil.GetDeploymentCondition(deployment.Status, condType) + return cond != nil && cond.Reason == reason, nil + }) + if pollErr == wait.ErrWaitTimeout { + pollErr = fmt.Errorf("deployment %q never updated with the desired condition and reason: %v", deploymentName, conditions) + } + return pollErr +} + func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deployment) { minReadySeconds := deployment.Spec.MinReadySeconds podList, err := deploymentutil.ListPods(deployment, diff --git a/test/test_owners.csv b/test/test_owners.csv index 1abf7a569b2..6b77a458d95 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -63,6 +63,8 @@ Deployment deployment should label adopted RSs and pods,pwittrock,0 Deployment deployment should support rollback,pwittrock,0 Deployment deployment should support rollback when there's replica set with no revision,pwittrock,0 Deployment deployment should support rollover,pwittrock,0 +Deployment iterative rollouts should eventually progress,kargakis,0 +Deployment lack of progress should be reported in the deployment status,kargakis,0 Deployment overlapping deployment should not fight with each other,kargakis,1 Deployment paused deployment should be able to scale,kargakis,1 Deployment paused deployment should be ignored by the controller,kargakis,0 From f52ea8fc6788c606352eb133df0e771f583b2cee Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Fri, 4 Nov 2016 13:00:37 +0100 Subject: [PATCH 3/3] Update replica annotations every time they are out of sync --- pkg/controller/deployment/sync.go | 52 ++++++++++++++++---------- pkg/controller/deployment/sync_test.go | 44 +++++++++++++++++----- 2 files changed, 67 insertions(+), 29 deletions(-) diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 3880b5470ba..85654cc57d5 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -438,31 +438,35 @@ func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS * // drives what happens in case we are trying to scale replica sets of the same size. // In such a case when scaling up, we should scale up newer replica sets first, and // when scaling down, we should scale down older replica sets first. - scalingOperation := "up" + var scalingOperation string switch { case deploymentReplicasToAdd > 0: sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs)) + scalingOperation = "up" case deploymentReplicasToAdd < 0: sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs)) scalingOperation = "down" - - default: /* deploymentReplicasToAdd == 0 */ - // Nothing to add. - return nil } // Iterate over all active replica sets and estimate proportions for each of them. // The absolute value of deploymentReplicasAdded should never exceed the absolute // value of deploymentReplicasToAdd. deploymentReplicasAdded := int32(0) + nameToSize := make(map[string]int32) for i := range allRSs { rs := allRSs[i] - proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) + // Estimate proportions if we have replicas to add, otherwise simply populate + // nameToSize with the current sizes for each replica set. + if deploymentReplicasToAdd != 0 { + proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) - rs.Spec.Replicas += proportion - deploymentReplicasAdded += proportion + nameToSize[rs.Name] = rs.Spec.Replicas + proportion + deploymentReplicasAdded += proportion + } else { + nameToSize[rs.Name] = rs.Spec.Replicas + } } // Update all replica sets @@ -470,15 +474,16 @@ func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS * rs := allRSs[i] // Add/remove any leftovers to the largest replica set. - if i == 0 { + if i == 0 && deploymentReplicasToAdd != 0 { leftover := deploymentReplicasToAdd - deploymentReplicasAdded - rs.Spec.Replicas += leftover - if rs.Spec.Replicas < 0 { - rs.Spec.Replicas = 0 + nameToSize[rs.Name] = nameToSize[rs.Name] + leftover + if nameToSize[rs.Name] < 0 { + nameToSize[rs.Name] = 0 } } - if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil { + // TODO: Use transactions when we have them. + if _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil { // Return as soon as we fail, the deployment is requeued return err } @@ -503,12 +508,21 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep } func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) { - // NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea. - rs.Spec.Replicas = newScale - deploymentutil.SetReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment)) - rs, err := dc.client.Extensions().ReplicaSets(rs.Namespace).Update(rs) - if err == nil { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %q to %d", scalingOperation, rs.Name, newScale) + objCopy, err := api.Scheme.Copy(rs) + if err != nil { + return nil, err + } + rsCopy := objCopy.(*extensions.ReplicaSet) + + sizeNeedsUpdate := rsCopy.Spec.Replicas != newScale + annotationsNeedUpdate := deploymentutil.SetReplicasAnnotations(rsCopy, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment)) + + if sizeNeedsUpdate || annotationsNeedUpdate { + rsCopy.Spec.Replicas = newScale + rs, err = dc.client.Extensions().ReplicaSets(rsCopy.Namespace).Update(rsCopy) + if err == nil && sizeNeedsUpdate { + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %q to %d", scalingOperation, rs.Name, newScale) + } } return rs, err } diff --git a/pkg/controller/deployment/sync_test.go b/pkg/controller/deployment/sync_test.go index 83704059a01..42c9b23de32 100644 --- a/pkg/controller/deployment/sync_test.go +++ b/pkg/controller/deployment/sync_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/record" + testclient "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/controller/informers" @@ -54,8 +55,9 @@ func TestScale(t *testing.T) { newRS *extensions.ReplicaSet oldRSs []*extensions.ReplicaSet - expectedNew *extensions.ReplicaSet - expectedOld []*extensions.ReplicaSet + expectedNew *extensions.ReplicaSet + expectedOld []*extensions.ReplicaSet + wasntUpdated map[string]bool desiredReplicasAnnotations map[string]int32 }{ @@ -193,8 +195,9 @@ func TestScale(t *testing.T) { newRS: rs("foo-v3", 0, nil, newTimestamp), oldRSs: []*extensions.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, - expectedNew: rs("foo-v3", 6, nil, newTimestamp), - expectedOld: []*extensions.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + expectedNew: rs("foo-v3", 6, nil, newTimestamp), + expectedOld: []*extensions.ReplicaSet{rs("foo-v2", 0, nil, oldTimestamp), rs("foo-v1", 0, nil, olderTimestamp)}, + wasntUpdated: map[string]bool{"foo-v2": true, "foo-v1": true}, }, // Scenario: deployment.spec.replicas == 3 ( foo-v1.spec.replicas == foo-v2.spec.replicas == foo-v3.spec.replicas == 1 ) // Deployment is scaled to 5. foo-v3.spec.replicas and foo-v2.spec.replicas should increment by 1 but foo-v2 fails to @@ -207,8 +210,9 @@ func TestScale(t *testing.T) { newRS: rs("foo-v3", 2, nil, newTimestamp), oldRSs: []*extensions.ReplicaSet{rs("foo-v2", 1, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, - expectedNew: rs("foo-v3", 2, nil, newTimestamp), - expectedOld: []*extensions.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + expectedNew: rs("foo-v3", 2, nil, newTimestamp), + expectedOld: []*extensions.ReplicaSet{rs("foo-v2", 2, nil, oldTimestamp), rs("foo-v1", 1, nil, olderTimestamp)}, + wasntUpdated: map[string]bool{"foo-v3": true, "foo-v1": true}, desiredReplicasAnnotations: map[string]int32{"foo-v2": int32(3)}, }, @@ -279,8 +283,28 @@ func TestScale(t *testing.T) { t.Errorf("%s: unexpected error: %v", test.name, err) continue } - if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != test.newRS.Spec.Replicas { - t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, test.newRS.Spec.Replicas) + + // Construct the nameToSize map that will hold all the sizes we got our of tests + // Skip updating the map if the replica set wasn't updated since there will be + // no update action for it. + nameToSize := make(map[string]int32) + if test.newRS != nil { + nameToSize[test.newRS.Name] = test.newRS.Spec.Replicas + } + for i := range test.oldRSs { + rs := test.oldRSs[i] + nameToSize[rs.Name] = rs.Spec.Replicas + } + // Get all the UPDATE actions and update nameToSize with all the updated sizes. + for _, action := range fake.Actions() { + rs := action.(testclient.UpdateAction).GetObject().(*extensions.ReplicaSet) + if !test.wasntUpdated[rs.Name] { + nameToSize[rs.Name] = rs.Spec.Replicas + } + } + + if test.expectedNew != nil && test.newRS != nil && test.expectedNew.Spec.Replicas != nameToSize[test.newRS.Name] { + t.Errorf("%s: expected new replicas: %d, got: %d", test.name, test.expectedNew.Spec.Replicas, nameToSize[test.newRS.Name]) continue } if len(test.expectedOld) != len(test.oldRSs) { @@ -290,8 +314,8 @@ func TestScale(t *testing.T) { for n := range test.oldRSs { rs := test.oldRSs[n] expected := test.expectedOld[n] - if expected.Spec.Replicas != rs.Spec.Replicas { - t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, expected.Spec.Replicas, rs.Spec.Replicas) + if expected.Spec.Replicas != nameToSize[rs.Name] { + t.Errorf("%s: expected old (%s) replicas: %d, got: %d", test.name, rs.Name, expected.Spec.Replicas, nameToSize[rs.Name]) } } }