diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index f05f23b31d1..c258ab7ceba 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -415,7 +415,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") - go deployment.NewDeploymentController(client("deployment-controller"), ResyncPeriod(s)). + go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")). Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 2a7e7a65c13..753d053db40 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -36,13 +36,12 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" - "k8s.io/kubernetes/pkg/watch" ) const ( @@ -67,34 +66,28 @@ type DeploymentController struct { syncHandler func(dKey string) error // A store of deployments, populated by the dController - dLister cache.StoreToDeploymentLister - // Watches changes to all deployments - dController *cache.Controller + dLister *cache.StoreToDeploymentLister // A store of ReplicaSets, populated by the rsController - rsLister cache.StoreToReplicaSetLister - // Watches changes to all ReplicaSets - rsController *cache.Controller + rsLister *cache.StoreToReplicaSetLister // A store of pods, populated by the podController - podLister cache.StoreToPodLister - // Watches changes to all pods - podController *cache.Controller + podLister *cache.StoreToPodLister // dListerSynced returns true if the Deployment store has been synced at least once. // Added as a member to the struct to allow injection for testing. - dListerSynced func() bool + dListerSynced cache.InformerSynced // rsListerSynced returns true if the ReplicaSet store has been synced at least once. // Added as a member to the struct to allow injection for testing. - rsListerSynced func() bool + rsListerSynced cache.InformerSynced // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. - podListerSynced func() bool + podListerSynced cache.InformerSynced // Deployments that need to be synced queue workqueue.RateLimitingInterface } // NewDeploymentController creates a new DeploymentController. -func NewDeploymentController(client clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController { +func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer informers.ReplicaSetInformer, podInformer informers.PodInformer, client clientset.Interface) *DeploymentController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -109,85 +102,41 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), } - dc.dLister.Indexer, dc.dController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return dc.client.Extensions().Deployments(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dc.client.Extensions().Deployments(api.NamespaceAll).Watch(options) - }, - }, - &extensions.Deployment{}, - FullDeploymentResyncPeriod, - cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addDeploymentNotification, - UpdateFunc: dc.updateDeploymentNotification, - // This will enter the sync loop and no-op, because the deployment has been deleted from the store. - DeleteFunc: dc.deleteDeploymentNotification, - }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dc.client.Extensions().ReplicaSets(api.NamespaceAll).Watch(options) - }, - }, - &extensions.ReplicaSet{}, - resyncPeriod(), - cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addReplicaSet, - UpdateFunc: dc.updateReplicaSet, - DeleteFunc: dc.deleteReplicaSet, - }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - dc.podLister.Indexer, dc.podController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return dc.client.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dc.client.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addPod, - UpdateFunc: dc.updatePod, - DeleteFunc: dc.deletePod, - }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dc.addDeploymentNotification, + UpdateFunc: dc.updateDeploymentNotification, + // This will enter the sync loop and no-op, because the deployment has been deleted from the store. + DeleteFunc: dc.deleteDeploymentNotification, + }) + rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dc.addReplicaSet, + UpdateFunc: dc.updateReplicaSet, + DeleteFunc: dc.deleteReplicaSet, + }) + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: dc.addPod, + UpdateFunc: dc.updatePod, + DeleteFunc: dc.deletePod, + }) dc.syncHandler = dc.syncDeployment - dc.dListerSynced = dc.dController.HasSynced - dc.rsListerSynced = dc.rsController.HasSynced - dc.podListerSynced = dc.podController.HasSynced + dc.dLister = dInformer.Lister() + dc.rsLister = rsInformer.Lister() + dc.podLister = podInformer.Lister() + dc.dListerSynced = dInformer.Informer().HasSynced + dc.rsListerSynced = dInformer.Informer().HasSynced + dc.podListerSynced = dInformer.Informer().HasSynced return dc } // Run begins watching and syncing. func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer dc.queue.ShutDown() - go dc.dController.Run(stopCh) - go dc.rsController.Run(stopCh) - go dc.podController.Run(stopCh) + glog.Infof("Starting deployment controller") - // Wait for the rc and dc stores to sync before starting any work in this controller. - ready := make(chan struct{}) - go dc.waitForSyncedListers(ready, stopCh) - select { - case <-ready: - case <-stopCh: + if !cache.WaitForCacheSync(stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } @@ -197,21 +146,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { <-stopCh glog.Infof("Shutting down deployment controller") - dc.queue.ShutDown() -} - -func (dc *DeploymentController) waitForSyncedListers(ready chan<- struct{}, stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - - for !dc.dListerSynced() || !dc.rsListerSynced() || !dc.podListerSynced() { - select { - case <-time.After(StoreSyncedPollPeriod): - case <-stopCh: - return - } - } - - close(ready) } func (dc *DeploymentController) addDeploymentNotification(obj interface{}) { diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 20305942b0a..4d6e1aaa024 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/uuid" @@ -168,8 +169,10 @@ func newFixture(t *testing.T) *fixture { func (f *fixture) run(deploymentName string) { f.client = fake.NewSimpleClientset(f.objects...) - c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc) + informers := informers.NewSharedInformerFactory(f.client, controller.NoResyncPeriodFunc()) + c := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), f.client) c.eventRecorder = &record.FakeRecorder{} + c.dListerSynced = alwaysReady c.rsListerSynced = alwaysReady c.podListerSynced = alwaysReady for _, d := range f.dLister { @@ -181,6 +184,9 @@ func (f *fixture) run(deploymentName string) { for _, pod := range f.podLister { c.podLister.Indexer.Add(pod) } + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) err := c.syncDeployment(deploymentName) if err != nil { @@ -188,13 +194,25 @@ func (f *fixture) run(deploymentName string) { } actions := f.client.Actions() + informerActions := 0 for i, action := range actions { - if len(f.actions) < i+1 { + if len(action.GetNamespace()) == 0 && + (action.Matches("list", "pods") || + action.Matches("list", "replicasets") || + action.Matches("list", "deployments") || + action.Matches("watch", "pods") || + action.Matches("watch", "replicasets") || + action.Matches("watch", "deployments")) { + informerActions++ + continue + } + + if len(f.actions)+informerActions < i+1 { f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:]) break } - expectedAction := f.actions[i] + expectedAction := f.actions[i-informerActions] if !expectedAction.Matches(action.GetVerb(), action.GetResource().Resource) { f.t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expectedAction, action) continue @@ -236,12 +254,17 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) { // issue: https://github.com/kubernetes/kubernetes/issues/23218 func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing.T) { fake := &fake.Clientset{} - controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) - + informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc()) + controller := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), fake) controller.eventRecorder = &record.FakeRecorder{} + controller.dListerSynced = alwaysReady controller.rsListerSynced = alwaysReady controller.podListerSynced = alwaysReady + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) empty := unversioned.LabelSelector{} d.Spec.Selector = &empty diff --git a/pkg/controller/deployment/sync_test.go b/pkg/controller/deployment/sync_test.go index a2c7bf11c7b..83704059a01 100644 --- a/pkg/controller/deployment/sync_test.go +++ b/pkg/controller/deployment/sync_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/util/intstr" ) @@ -346,15 +347,21 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) { for i := range tests { test := tests[i] fake := &fake.Clientset{} - controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc) + informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc()) + controller := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), fake) controller.eventRecorder = &record.FakeRecorder{} + controller.dListerSynced = alwaysReady controller.rsListerSynced = alwaysReady controller.podListerSynced = alwaysReady for _, rs := range test.oldRSs { controller.rsLister.Indexer.Add(rs) } + stopCh := make(chan struct{}) + defer close(stopCh) + informers.Start(stopCh) + d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"}) controller.cleanupDeployment(test.oldRSs, d) diff --git a/pkg/controller/informers/extensions.go b/pkg/controller/informers/extensions.go index ba9a7a1fc98..95e9e8d67fa 100644 --- a/pkg/controller/informers/extensions.go +++ b/pkg/controller/informers/extensions.go @@ -18,6 +18,7 @@ package informers import ( "reflect" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" @@ -98,7 +99,9 @@ func (f *deploymentInformer) Informer() cache.SharedIndexInformer { }, }, &extensions.Deployment{}, - f.defaultResync, + // TODO remove this. It is hardcoded so that "Waiting for the second deployment to clear overlapping annotation" in + // "overlapping deployment should not fight with each other" will work since it requires a full resync to work properly. + 30*time.Second, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) f.informers[informerType] = informer