diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 25786fc175a..4e1cc0206c0 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -34,6 +34,7 @@ import ( unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" @@ -51,6 +52,8 @@ const ( // We must avoid creating new replica set / counting pods until the replica set / pods store has synced. // If it hasn't synced, to avoid a hot loop, we'll wait this long between checks. StoreSyncedPollPeriod = 100 * time.Millisecond + // MaxRetries is the number of times a deployment will be retried before it is dropped out of the queue. + MaxRetries = 5 ) // DeploymentController is responsible for synchronizing Deployment objects stored @@ -70,19 +73,23 @@ type DeploymentController struct { rsStore cache.StoreToReplicaSetLister // Watches changes to all ReplicaSets rsController *framework.Controller - // rsStoreSynced returns true if the ReplicaSet store has been synced at least once. - // Added as a member to the struct to allow injection for testing. - rsStoreSynced func() bool // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods podController *framework.Controller + + // dStoreSynced returns true if the Deployment store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + dStoreSynced func() bool + // rsStoreSynced returns true if the ReplicaSet store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + rsStoreSynced func() bool // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool // Deployments that need to be synced - queue *workqueue.Type + queue workqueue.RateLimitingInterface } // NewDeploymentController creates a new DeploymentController. @@ -98,7 +105,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller dc := &DeploymentController{ client: client, eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}), - queue: workqueue.New(), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } dc.dStore.Store, dc.dController = framework.NewInformer( @@ -158,6 +165,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller ) dc.syncHandler = dc.syncDeployment + dc.dStoreSynced = dc.dController.HasSynced dc.rsStoreSynced = dc.rsController.HasSynced dc.podStoreSynced = dc.podController.HasSynced return dc @@ -166,17 +174,43 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller // Run begins watching and syncing. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + go dc.dController.Run(stopCh) go dc.rsController.Run(stopCh) go dc.podController.Run(stopCh) + + // Wait for the rc and dc stores to sync before starting any work in this controller. + ready := make(chan struct{}) + go dc.waitForSyncedStores(ready, stopCh) + select { + case <-ready: + case <-stopCh: + return + } + for i := 0; i < workers; i++ { go wait.Until(dc.worker, time.Second, stopCh) } + <-stopCh glog.Infof("Shutting down deployment controller") dc.queue.ShutDown() } +func (dc *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + for !dc.dStoreSynced() || !dc.rsStoreSynced() || !dc.podStoreSynced() { + select { + case <-time.After(StoreSyncedPollPeriod): + case <-stopCh: + return + } + } + + close(ready) +} + func (dc *DeploymentController) addDeploymentNotification(obj interface{}) { d := obj.(*extensions.Deployment) glog.V(4).Infof("Adding deployment %s", d.Name) @@ -382,19 +416,40 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (dc *DeploymentController) worker() { - for { - func() { - key, quit := dc.queue.Get() - if quit { - return - } - defer dc.queue.Done(key) - err := dc.syncHandler(key.(string)) - if err != nil { - glog.Errorf("Error syncing deployment %v: %v", key, err) - } - }() + work := func() bool { + key, quit := dc.queue.Get() + if quit { + return true + } + defer dc.queue.Done(key) + + err := dc.syncHandler(key.(string)) + dc.handleErr(err, key) + + return false } + + for { + if quit := work(); quit { + return + } + } +} + +func (dc *DeploymentController) handleErr(err error, key interface{}) { + if err == nil { + dc.queue.Forget(key) + return + } + + if dc.queue.NumRequeues(key) < MaxRetries { + glog.V(2).Infof("Error syncing deployment %v: %v", key, err) + dc.queue.AddRateLimited(key) + return + } + + utilruntime.HandleError(err) + dc.queue.Forget(key) } // syncDeployment will sync the deployment with the given key. @@ -405,18 +460,9 @@ func (dc *DeploymentController) syncDeployment(key string) error { glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime)) }() - if !dc.rsStoreSynced() || !dc.podStoreSynced() { - // Sleep so we give the replica set / pod reflector goroutine a chance to run. - time.Sleep(StoreSyncedPollPeriod) - glog.Infof("Waiting for replica set / pod controller to sync, requeuing deployment %s", key) - dc.queue.Add(key) - return nil - } - obj, exists, err := dc.dStore.Store.GetByKey(key) if err != nil { glog.Infof("Unable to retrieve deployment %v from store: %v", key, err) - dc.queue.Add(key) return err } if !exists { @@ -424,13 +470,20 @@ func (dc *DeploymentController) syncDeployment(key string) error { return nil } - d := obj.(*extensions.Deployment) + deployment := obj.(*extensions.Deployment) everything := unversioned.LabelSelector{} - if reflect.DeepEqual(d.Spec.Selector, &everything) { - dc.eventRecorder.Eventf(d, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") + if reflect.DeepEqual(deployment.Spec.Selector, &everything) { + dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") return nil } + // Deep-copy otherwise we are mutating our cache. + // TODO: Deep-copy only when needed. + d, err := util.DeploymentDeepCopy(deployment) + if err != nil { + return err + } + if d.Spec.Paused { return dc.sync(d) } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 8ce7059a152..f9b071217a5 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -273,7 +273,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) if err != nil { - dc.enqueueDeployment(deployment) return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err) } if newReplicasCount > 0 { @@ -415,9 +414,6 @@ func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newSc rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) if err == nil { dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) - } else { - glog.Warningf("Cannot update replica set %q: %v", rs.Name, err) - dc.enqueueDeployment(deployment) } return rs, err } diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index e99a55e6682..16eb42b249a 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -753,3 +753,15 @@ func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired return int32(surge), int32(unavailable), nil } + +func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployment, error) { + objCopy, err := api.Scheme.DeepCopy(deployment) + if err != nil { + return nil, err + } + copied, ok := objCopy.(*extensions.Deployment) + if !ok { + return nil, fmt.Errorf("expected Deployment, got %#v", objCopy) + } + return copied, nil +}