diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index e90429a1d53..26245aaff75 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -85,6 +85,8 @@ type DeploymentController struct { // Deployments that need to be synced queue workqueue.RateLimitingInterface + // Deployments that need to be checked for progress. + progressQueue workqueue.RateLimitingInterface } // NewDeploymentController creates a new DeploymentController. @@ -101,13 +103,14 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer client: client, eventRecorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "deployment-controller"}), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), + progressQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "progress-check"), } dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addDeploymentNotification, - UpdateFunc: dc.updateDeploymentNotification, + AddFunc: dc.addDeployment, + UpdateFunc: dc.updateDeployment, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. - DeleteFunc: dc.deleteDeploymentNotification, + DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, @@ -129,6 +132,7 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() + defer dc.progressQueue.ShutDown() glog.Infof("Starting deployment controller") @@ -139,25 +143,26 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(dc.worker, time.Second, stopCh) } + go wait.Until(dc.progressWorker, time.Second, stopCh) <-stopCh glog.Infof("Shutting down deployment controller") } -func (dc *DeploymentController) addDeploymentNotification(obj interface{}) { +func (dc *DeploymentController) addDeployment(obj interface{}) { d := obj.(*extensions.Deployment) glog.V(4).Infof("Adding deployment %s", d.Name) dc.enqueueDeployment(d) } -func (dc *DeploymentController) updateDeploymentNotification(old, cur interface{}) { +func (dc *DeploymentController) updateDeployment(old, cur interface{}) { oldD := old.(*extensions.Deployment) glog.V(4).Infof("Updating deployment %s", oldD.Name) // Resync on deployment object relist. dc.enqueueDeployment(cur.(*extensions.Deployment)) } -func (dc *DeploymentController) deleteDeploymentNotification(obj interface{}) { +func (dc *DeploymentController) deleteDeployment(obj interface{}) { d, ok := obj.(*extensions.Deployment) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -266,27 +271,37 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym dc.queue.Add(key) } +// enqueueAfter will enqueue a deployment after the provided amount of time in a secondary queue. +// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued +// back to the main queue iff it has failed progressing. +func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) { + key, err := controller.KeyFunc(deployment) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err)) + return + } + + dc.progressQueue.AddAfter(key, after) +} + // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (dc *DeploymentController) worker() { - work := func() bool { - key, quit := dc.queue.Get() - if quit { - return true - } - defer dc.queue.Done(key) - - err := dc.syncHandler(key.(string)) - dc.handleErr(err, key) + for dc.processNextWorkItem() { + } +} +func (dc *DeploymentController) processNextWorkItem() bool { + key, quit := dc.queue.Get() + if quit { return false } + defer dc.queue.Done(key) - for { - if quit := work(); quit { - return - } - } + err := dc.syncHandler(key.(string)) + dc.handleErr(err, key) + + return true } func (dc *DeploymentController) handleErr(err error, key interface{}) { @@ -310,6 +325,7 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) { // This function is not meant to be invoked concurrently with the same key. func (dc *DeploymentController) syncDeployment(key string) error { startTime := time.Now() + glog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime) defer func() { glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime)) }() @@ -453,3 +469,57 @@ func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.De delete(deployment.Annotations, util.OverlapAnnotation) return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment) } + +// progressWorker runs a worker thread that pops items out of a secondary queue, checks if they +// have failed progressing and if so it adds them back to the main queue. +func (dc *DeploymentController) progressWorker() { + for dc.checkNextItemForProgress() { + } +} + +// checkNextItemForProgress checks if a deployment has failed progressing and if so it adds it back +// to the main queue. +func (dc *DeploymentController) checkNextItemForProgress() bool { + key, quit := dc.progressQueue.Get() + if quit { + return false + } + defer dc.progressQueue.Done(key) + + needsResync, err := dc.checkForProgress(key.(string)) + if err != nil { + utilruntime.HandleError(err) + } + if err == nil && needsResync { + dc.queue.AddRateLimited(key) + } + dc.progressQueue.Forget(key) + return true +} + +// checkForProgress checks the progress for the provided deployment. Meant to be called +// by the progressWorker and work on items synced in a secondary queue. +func (dc *DeploymentController) checkForProgress(key string) (bool, error) { + obj, exists, err := dc.dLister.Indexer.GetByKey(key) + if err != nil { + glog.V(2).Infof("Cannot retrieve deployment %q found in the secondary queue: %#v", key, err) + return false, err + } + if !exists { + return false, nil + } + deployment := obj.(*extensions.Deployment) + cond := util.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing) + // Already marked with a terminal reason - no need to add it back to the main queue. + if cond != nil && (cond.Reason == util.TimedOutReason || cond.Reason == util.NewRSAvailableReason) { + return false, nil + } + // Deep-copy otherwise we may mutate our cache. + // TODO: Remove deep-copying from here. This worker does not need to sync the annotations + // in the deployment. + d, err := util.DeploymentDeepCopy(deployment) + if err != nil { + return false, err + } + return dc.hasFailed(d) +} diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index f53339b62ce..28d87912c8f 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -19,6 +19,9 @@ package deployment import ( "fmt" "reflect" + "time" + + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" @@ -138,13 +141,21 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe // Do not update if there is nothing new to add. if reflect.DeepEqual(d.Status, newStatus) { - // TODO: If there is no sign of progress at this point then there is a high chance that the - // deployment is stuck. We should resync this deployment at some point[1] in the future[2] and - // check if it has timed out. We definitely need this, otherwise we depend on the controller - // resync interval. See https://github.com/kubernetes/kubernetes/issues/34458. + // If there is no sign of progress at this point then there is a high chance that the + // deployment is stuck. We should resync this deployment at some point[1] in the future + // and check if it has timed out. We definitely need this, otherwise we depend on the + // controller resync interval. See https://github.com/kubernetes/kubernetes/issues/34458. // // [1] time.Now() + progressDeadlineSeconds - lastUpdateTime (of the Progressing condition). - // [2] Use dc.queue.AddAfter + if d.Spec.ProgressDeadlineSeconds != nil && + !util.DeploymentComplete(d, &newStatus) && + !util.DeploymentTimedOut(d, &newStatus) && + currentCond != nil { + + after := time.Now().Add(time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second).Sub(currentCond.LastUpdateTime.Time) + glog.V(4).Infof("Queueing up deployment %q for a progress check after %ds", d.Name, int(after.Seconds())) + dc.enqueueAfter(d, after) + } return nil } diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 0e10baa7d35..bb129f6fd6a 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -827,12 +827,14 @@ func IsRollingUpdate(deployment *extensions.Deployment) bool { // updatedReplicas and it doesn't violate minimum availability. func DeploymentComplete(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool { return newStatus.UpdatedReplicas == *(deployment.Spec.Replicas) && - newStatus.AvailableReplicas >= *(deployment.Spec.Replicas)-MaxUnavailable(*deployment) + newStatus.AvailableReplicas >= *(deployment.Spec.Replicas)-MaxUnavailable(*deployment) && + newStatus.ObservedGeneration >= deployment.Generation } // DeploymentProgressing reports progress for a deployment. Progress is estimated by comparing the -// current with the new status of the deployment that the controller is observing. The following -// algorithm is already used in the kubectl rolling updater to report lack of progress. +// current with the new status of the deployment that the controller is observing. More specifically, +// when new pods are scaled up or become available, or old pods are scaled down, then we consider the +// deployment is progressing. func DeploymentProgressing(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool { oldStatus := deployment.Status @@ -840,7 +842,9 @@ func DeploymentProgressing(deployment *extensions.Deployment, newStatus *extensi oldStatusOldReplicas := oldStatus.Replicas - oldStatus.UpdatedReplicas newStatusOldReplicas := newStatus.Replicas - newStatus.UpdatedReplicas - return (newStatus.UpdatedReplicas > oldStatus.UpdatedReplicas) || (newStatusOldReplicas < oldStatusOldReplicas) + return (newStatus.UpdatedReplicas > oldStatus.UpdatedReplicas) || + (newStatusOldReplicas < oldStatusOldReplicas) || + newStatus.AvailableReplicas > deployment.Status.AvailableReplicas } // used for unit testing