diff --git a/pkg/controller/deployment/BUILD b/pkg/controller/deployment/BUILD index 41a506c359a..c5607e18ead 100644 --- a/pkg/controller/deployment/BUILD +++ b/pkg/controller/deployment/BUILD @@ -54,6 +54,7 @@ go_test( name = "go_default_test", srcs = [ "deployment_controller_test.go", + "progress_test.go", "recreate_test.go", "rolling_test.go", "sync_test.go", @@ -75,6 +76,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/record", + "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 94bfc02e3e4..c551d190769 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -94,8 +94,6 @@ type DeploymentController struct { // Deployments that need to be synced queue workqueue.RateLimitingInterface - // Deployments that need to be checked for progress. - progressQueue workqueue.RateLimitingInterface } // NewDeploymentController creates a new DeploymentController. @@ -112,7 +110,6 @@ func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, r client: client, eventRecorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "deployment-controller"}), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), - progressQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "progress-check"), } dc.rsControl = controller.RealRSControl{ KubeClient: client, @@ -150,7 +147,6 @@ func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, r func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() - defer dc.progressQueue.ShutDown() glog.Infof("Starting deployment controller") @@ -162,7 +158,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(dc.worker, time.Second, stopCh) } - go wait.Until(dc.progressWorker, time.Second, stopCh) <-stopCh glog.Infof("Shutting down deployment controller") @@ -357,17 +352,25 @@ func (dc *DeploymentController) enqueue(deployment *extensions.Deployment) { dc.queue.Add(key) } -// checkProgressAfter will enqueue a deployment after the provided amount of time in a secondary queue. -// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued -// back to the main queue iff it has failed progressing. -func (dc *DeploymentController) checkProgressAfter(deployment *extensions.Deployment, after time.Duration) { +func (dc *DeploymentController) enqueueRateLimited(deployment *extensions.Deployment) { key, err := controller.KeyFunc(deployment) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err)) return } - dc.progressQueue.AddAfter(key, after) + dc.queue.AddRateLimited(key) +} + +// enqueueAfter will enqueue a deployment after the provided amount of time. +func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) { + key, err := controller.KeyFunc(deployment) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err)) + return + } + + dc.queue.AddAfter(key, after) } // getDeploymentForPod returns the deployment managing the given Pod. @@ -712,62 +715,3 @@ func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.De delete(deployment.Annotations, util.OverlapAnnotation) return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment) } - -// progressWorker runs a worker thread that pops items out of a secondary queue, checks if they -// have failed progressing and if so it adds them back to the main queue. -func (dc *DeploymentController) progressWorker() { - for dc.checkNextItemForProgress() { - } -} - -// checkNextItemForProgress checks if a deployment has failed progressing and if so it adds it back -// to the main queue. -func (dc *DeploymentController) checkNextItemForProgress() bool { - key, quit := dc.progressQueue.Get() - if quit { - return false - } - defer dc.progressQueue.Done(key) - - needsResync, err := dc.checkForProgress(key.(string)) - if err != nil { - utilruntime.HandleError(err) - } - if err == nil && needsResync { - glog.V(2).Infof("Deployment %q has failed progressing - syncing it back to the main queue for an update", key.(string)) - dc.queue.AddRateLimited(key) - } - dc.progressQueue.Forget(key) - return true -} - -// checkForProgress checks the progress for the provided deployment. Meant to be called -// by the progressWorker and work on items synced in a secondary queue. -func (dc *DeploymentController) checkForProgress(key string) (bool, error) { - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - return false, err - } - deployment, err := dc.dLister.Deployments(namespace).Get(name) - if errors.IsNotFound(err) { - return false, nil - } - if err != nil { - glog.V(2).Infof("Cannot retrieve deployment %q found in the secondary queue: %#v", key, err) - return false, err - } - cond := util.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing) - // Already marked with a terminal reason - no need to add it back to the main queue. - if cond != nil && (cond.Reason == util.TimedOutReason || cond.Reason == util.NewRSAvailableReason) { - return false, nil - } - // Deep-copy otherwise we may mutate our cache. - // TODO: Remove deep-copying from here. This worker does not need to sync the annotations - // in the deployment. - d, err := util.DeploymentDeepCopy(deployment) - if err != nil { - return false, err - } - glog.V(2).Infof("Syncing deployment %q for a progress check", key) - return dc.hasFailed(d) -} diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index d79f5205201..667f9fac4f3 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -150,21 +150,8 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe // Do not update if there is nothing new to add. if reflect.DeepEqual(d.Status, newStatus) { - // If there is no sign of progress at this point then there is a high chance that the - // deployment is stuck. We should resync this deployment at some point[1] in the future - // and check if it has timed out. We definitely need this, otherwise we depend on the - // controller resync interval. See https://github.com/kubernetes/kubernetes/issues/34458. - // - // [1] time.Now() + progressDeadlineSeconds - lastUpdateTime (of the Progressing condition). - if d.Spec.ProgressDeadlineSeconds != nil && - !util.DeploymentComplete(d, &newStatus) && - !util.DeploymentTimedOut(d, &newStatus) && - currentCond != nil { - - after := time.Now().Add(time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second).Sub(currentCond.LastUpdateTime.Time) - glog.V(2).Infof("Queueing up deployment %q for a progress check after %ds", d.Name, int(after.Seconds())) - dc.checkProgressAfter(d, after) - } + // Requeue the deployment if required. + dc.requeueStuckDeployment(d, newStatus) return nil } @@ -207,3 +194,48 @@ func (dc *DeploymentController) getReplicaFailures(allRSs []*extensions.ReplicaS } return conditions } + +// used for unit testing +var nowFn = func() time.Time { return time.Now() } + +// requeueStuckDeployment checks whether the provided deployment needs to be synced for a progress +// check. It returns the time after the deployment will be requeued for the progress check, 0 if it +// will be requeued now, or -1 if it does not need to be requeued. +func (dc *DeploymentController) requeueStuckDeployment(d *extensions.Deployment, newStatus extensions.DeploymentStatus) time.Duration { + currentCond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing) + // Can't estimate progress if there is no deadline in the spec or progressing condition in the current status. + if d.Spec.ProgressDeadlineSeconds == nil || currentCond == nil { + return time.Duration(-1) + } + // No need to estimate progress if the rollout is complete or already timed out. + if util.DeploymentComplete(d, &newStatus) || currentCond.Reason == util.TimedOutReason { + return time.Duration(-1) + } + // If there is no sign of progress at this point then there is a high chance that the + // deployment is stuck. We should resync this deployment at some point in the future[1] + // and check whether it has timed out. We definitely need this, otherwise we depend on the + // controller resync interval. See https://github.com/kubernetes/kubernetes/issues/34458. + // + // [1] ProgressingCondition.LastUpdatedTime + progressDeadlineSeconds - time.Now() + // + // For example, if a Deployment updated its Progressing condition 3 minutes ago and has a + // deadline of 10 minutes, it would need to be resynced for a progress check after 7 minutes. + // + // lastUpdated: 00:00:00 + // now: 00:03:00 + // progressDeadlineSeconds: 600 (10 minutes) + // + // lastUpdated + progressDeadlineSeconds - now => 00:00:00 + 00:10:00 - 00:03:00 => 07:00 + after := currentCond.LastUpdateTime.Time.Add(time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second).Sub(nowFn()) + // If the remaining time is less than a second, then requeue the deployment immediately. + // Make it ratelimited so we stay on the safe side, eventually the Deployment should + // transition either to a Complete or to a TimedOut condition. + if after < time.Second { + glog.V(2).Infof("Queueing up deployment %q for a progress check now", d.Name) + dc.enqueueRateLimited(d) + return time.Duration(0) + } + glog.V(2).Infof("Queueing up deployment %q for a progress check after %ds", d.Name, int(after.Seconds())) + dc.enqueueAfter(d, after) + return after +} diff --git a/pkg/controller/deployment/progress_test.go b/pkg/controller/deployment/progress_test.go new file mode 100644 index 00000000000..07f120767a5 --- /dev/null +++ b/pkg/controller/deployment/progress_test.go @@ -0,0 +1,163 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/api/v1" + extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/controller/deployment/util" +) + +func newDeploymentStatus(replicas, updatedReplicas, availableReplicas int32) extensions.DeploymentStatus { + return extensions.DeploymentStatus{ + Replicas: replicas, + UpdatedReplicas: updatedReplicas, + AvailableReplicas: availableReplicas, + } +} + +// assumes the retuned deployment is always observed - not needed to be tested here. +func currentDeployment(pds *int32, replicas, statusReplicas, updatedReplicas, availableReplicas int32, conditions []extensions.DeploymentCondition) *extensions.Deployment { + d := &extensions.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "progress-test", + }, + Spec: extensions.DeploymentSpec{ + ProgressDeadlineSeconds: pds, + Replicas: &replicas, + Strategy: extensions.DeploymentStrategy{ + Type: extensions.RecreateDeploymentStrategyType, + }, + }, + Status: newDeploymentStatus(statusReplicas, updatedReplicas, availableReplicas), + } + d.Status.Conditions = conditions + return d +} + +func TestRequeueStuckDeployment(t *testing.T) { + pds := int32(60) + failed := []extensions.DeploymentCondition{ + { + Type: extensions.DeploymentProgressing, + Status: v1.ConditionFalse, + Reason: util.TimedOutReason, + }, + } + stuck := []extensions.DeploymentCondition{ + { + Type: extensions.DeploymentProgressing, + Status: v1.ConditionTrue, + LastUpdateTime: metav1.Date(2017, 2, 15, 18, 49, 00, 00, time.UTC), + }, + } + + tests := []struct { + name string + d *extensions.Deployment + status extensions.DeploymentStatus + nowFn func() time.Time + expected time.Duration + }{ + { + name: "no progressDeadlineSeconds specified", + d: currentDeployment(nil, 4, 3, 3, 2, nil), + status: newDeploymentStatus(3, 3, 2), + expected: time.Duration(-1), + }, + { + name: "no progressing condition found", + d: currentDeployment(&pds, 4, 3, 3, 2, nil), + status: newDeploymentStatus(3, 3, 2), + expected: time.Duration(-1), + }, + { + name: "complete deployment does not need to be requeued", + d: currentDeployment(&pds, 3, 3, 3, 3, nil), + status: newDeploymentStatus(3, 3, 3), + expected: time.Duration(-1), + }, + { + name: "already failed deployment does not need to be requeued", + d: currentDeployment(&pds, 3, 3, 3, 0, failed), + status: newDeploymentStatus(3, 3, 0), + expected: time.Duration(-1), + }, + { + name: "stuck deployment - 30s", + d: currentDeployment(&pds, 3, 3, 3, 1, stuck), + status: newDeploymentStatus(3, 3, 1), + nowFn: func() time.Time { return metav1.Date(2017, 2, 15, 18, 49, 30, 00, time.UTC).Time }, + expected: 30 * time.Second, + }, + { + name: "stuck deployment - 1s", + d: currentDeployment(&pds, 3, 3, 3, 1, stuck), + status: newDeploymentStatus(3, 3, 1), + nowFn: func() time.Time { return metav1.Date(2017, 2, 15, 18, 49, 59, 00, time.UTC).Time }, + expected: 1 * time.Second, + }, + { + name: "failed deployment - less than a second => now", + d: currentDeployment(&pds, 3, 3, 3, 1, stuck), + status: newDeploymentStatus(3, 3, 1), + nowFn: func() time.Time { return metav1.Date(2017, 2, 15, 18, 49, 59, 1, time.UTC).Time }, + expected: time.Duration(0), + }, + { + name: "failed deployment - now", + d: currentDeployment(&pds, 3, 3, 3, 1, stuck), + status: newDeploymentStatus(3, 3, 1), + nowFn: func() time.Time { return metav1.Date(2017, 2, 15, 18, 50, 00, 00, time.UTC).Time }, + expected: time.Duration(0), + }, + { + name: "failed deployment - 1s after deadline", + d: currentDeployment(&pds, 3, 3, 3, 1, stuck), + status: newDeploymentStatus(3, 3, 1), + nowFn: func() time.Time { return metav1.Date(2017, 2, 15, 18, 50, 01, 00, time.UTC).Time }, + expected: time.Duration(0), + }, + { + name: "failed deployment - 60s after deadline", + d: currentDeployment(&pds, 3, 3, 3, 1, stuck), + status: newDeploymentStatus(3, 3, 1), + nowFn: func() time.Time { return metav1.Date(2017, 2, 15, 18, 51, 00, 00, time.UTC).Time }, + expected: time.Duration(0), + }, + } + + dc := &DeploymentController{ + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "doesnt-matter"), + } + dc.enqueueDeployment = dc.enqueue + + for _, test := range tests { + if test.nowFn != nil { + nowFn = test.nowFn + } + got := dc.requeueStuckDeployment(test.d, test.status) + if got != test.expected { + t.Errorf("%s: got duration: %v, expected duration: %v", test.name, got, test.expected) + } + } +}