controller: various fixes for the deployment controller

Changes:
* moved waiting for synced caches before starting any work
* refactored worker() to really quit on quit
* changed queue to a ratelimiting queue and added retries on errors
* deep-copy deployments before mutating - we still need to deep-copy
replica sets and pods
This commit is contained in:
Michail Kargakis 2016-07-08 14:48:38 +02:00
parent 5504c58ce2
commit 1fb8dd838b
3 changed files with 94 additions and 33 deletions

View File

@ -34,6 +34,7 @@ import (
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
@ -51,6 +52,8 @@ const (
// We must avoid creating new replica set / counting pods until the replica set / pods store has synced. // We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
// If it hasn't synced, to avoid a hot loop, we'll wait this long between checks. // If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
StoreSyncedPollPeriod = 100 * time.Millisecond StoreSyncedPollPeriod = 100 * time.Millisecond
// MaxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
MaxRetries = 5
) )
// DeploymentController is responsible for synchronizing Deployment objects stored // DeploymentController is responsible for synchronizing Deployment objects stored
@ -70,19 +73,23 @@ type DeploymentController struct {
rsStore cache.StoreToReplicaSetLister rsStore cache.StoreToReplicaSetLister
// Watches changes to all ReplicaSets // Watches changes to all ReplicaSets
rsController *framework.Controller rsController *framework.Controller
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsStoreSynced func() bool
// A store of pods, populated by the podController // A store of pods, populated by the podController
podStore cache.StoreToPodLister podStore cache.StoreToPodLister
// Watches changes to all pods // Watches changes to all pods
podController *framework.Controller podController *framework.Controller
// dStoreSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dStoreSynced func() bool
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsStoreSynced func() bool
// podStoreSynced returns true if the pod store has been synced at least once. // podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool podStoreSynced func() bool
// Deployments that need to be synced // Deployments that need to be synced
queue *workqueue.Type queue workqueue.RateLimitingInterface
} }
// NewDeploymentController creates a new DeploymentController. // NewDeploymentController creates a new DeploymentController.
@ -98,7 +105,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
dc := &DeploymentController{ dc := &DeploymentController{
client: client, client: client,
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}), eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.New(), queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
} }
dc.dStore.Store, dc.dController = framework.NewInformer( dc.dStore.Store, dc.dController = framework.NewInformer(
@ -158,6 +165,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
) )
dc.syncHandler = dc.syncDeployment dc.syncHandler = dc.syncDeployment
dc.dStoreSynced = dc.dController.HasSynced
dc.rsStoreSynced = dc.rsController.HasSynced dc.rsStoreSynced = dc.rsController.HasSynced
dc.podStoreSynced = dc.podController.HasSynced dc.podStoreSynced = dc.podController.HasSynced
return dc return dc
@ -166,17 +174,43 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
// Run begins watching and syncing. // Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
go dc.dController.Run(stopCh) go dc.dController.Run(stopCh)
go dc.rsController.Run(stopCh) go dc.rsController.Run(stopCh)
go dc.podController.Run(stopCh) go dc.podController.Run(stopCh)
// Wait for the rc and dc stores to sync before starting any work in this controller.
ready := make(chan struct{})
go dc.waitForSyncedStores(ready, stopCh)
select {
case <-ready:
case <-stopCh:
return
}
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh) go wait.Until(dc.worker, time.Second, stopCh)
} }
<-stopCh <-stopCh
glog.Infof("Shutting down deployment controller") glog.Infof("Shutting down deployment controller")
dc.queue.ShutDown() dc.queue.ShutDown()
} }
func (dc *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for !dc.dStoreSynced() || !dc.rsStoreSynced() || !dc.podStoreSynced() {
select {
case <-time.After(StoreSyncedPollPeriod):
case <-stopCh:
return
}
}
close(ready)
}
func (dc *DeploymentController) addDeploymentNotification(obj interface{}) { func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
d := obj.(*extensions.Deployment) d := obj.(*extensions.Deployment)
glog.V(4).Infof("Adding deployment %s", d.Name) glog.V(4).Infof("Adding deployment %s", d.Name)
@ -382,20 +416,41 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() { func (dc *DeploymentController) worker() {
for { work := func() bool {
func() {
key, quit := dc.queue.Get() key, quit := dc.queue.Get()
if quit { if quit {
return return true
} }
defer dc.queue.Done(key) defer dc.queue.Done(key)
err := dc.syncHandler(key.(string)) err := dc.syncHandler(key.(string))
if err != nil { dc.handleErr(err, key)
glog.Errorf("Error syncing deployment %v: %v", key, err)
return false
} }
}()
for {
if quit := work(); quit {
return
} }
} }
}
func (dc *DeploymentController) handleErr(err error, key interface{}) {
if err == nil {
dc.queue.Forget(key)
return
}
if dc.queue.NumRequeues(key) < MaxRetries {
glog.V(2).Infof("Error syncing deployment %v: %v", key, err)
dc.queue.AddRateLimited(key)
return
}
utilruntime.HandleError(err)
dc.queue.Forget(key)
}
// syncDeployment will sync the deployment with the given key. // syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key. // This function is not meant to be invoked concurrently with the same key.
@ -405,18 +460,9 @@ func (dc *DeploymentController) syncDeployment(key string) error {
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime)) glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
}() }()
if !dc.rsStoreSynced() || !dc.podStoreSynced() {
// Sleep so we give the replica set / pod reflector goroutine a chance to run.
time.Sleep(StoreSyncedPollPeriod)
glog.Infof("Waiting for replica set / pod controller to sync, requeuing deployment %s", key)
dc.queue.Add(key)
return nil
}
obj, exists, err := dc.dStore.Store.GetByKey(key) obj, exists, err := dc.dStore.Store.GetByKey(key)
if err != nil { if err != nil {
glog.Infof("Unable to retrieve deployment %v from store: %v", key, err) glog.Infof("Unable to retrieve deployment %v from store: %v", key, err)
dc.queue.Add(key)
return err return err
} }
if !exists { if !exists {
@ -424,13 +470,20 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return nil return nil
} }
d := obj.(*extensions.Deployment) deployment := obj.(*extensions.Deployment)
everything := unversioned.LabelSelector{} everything := unversioned.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) { if reflect.DeepEqual(deployment.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
return nil return nil
} }
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d, err := util.DeploymentDeepCopy(deployment)
if err != nil {
return err
}
if d.Spec.Paused { if d.Spec.Paused {
return dc.sync(d) return dc.sync(d)
} }

View File

@ -273,7 +273,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false)
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
if err != nil { if err != nil {
dc.enqueueDeployment(deployment)
return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err) return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err)
} }
if newReplicasCount > 0 { if newReplicasCount > 0 {
@ -415,9 +414,6 @@ func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newSc
rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs) rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
if err == nil { if err == nil {
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} else {
glog.Warningf("Cannot update replica set %q: %v", rs.Name, err)
dc.enqueueDeployment(deployment)
} }
return rs, err return rs, err
} }

View File

@ -753,3 +753,15 @@ func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired
return int32(surge), int32(unavailable), nil return int32(surge), int32(unavailable), nil
} }
func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployment, error) {
objCopy, err := api.Scheme.DeepCopy(deployment)
if err != nil {
return nil, err
}
copied, ok := objCopy.(*extensions.Deployment)
if !ok {
return nil, fmt.Errorf("expected Deployment, got %#v", objCopy)
}
return copied, nil
}