diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 3e7e3ed5261..3537d4824c7 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -529,10 +529,11 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl } } - serviceaccountcontroller.NewServiceAccountsController( + go serviceaccountcontroller.NewServiceAccountsController( + sharedInformers.ServiceAccounts(), sharedInformers.Namespaces(), client("service-account-controller"), serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), - ).Run() + ).Run(1, stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) if s.EnableGarbageCollector { diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 7bc5706e3a5..3b818d7e024 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -450,23 +450,6 @@ func (s *StoreToCertificateRequestLister) List() (csrs certificates.CertificateS return csrs, nil } -// IndexerToNamespaceLister gives an Indexer List method -type IndexerToNamespaceLister struct { - Indexer -} - -// List returns a list of namespaces -func (i *IndexerToNamespaceLister) List(selector labels.Selector) (namespaces []*api.Namespace, err error) { - for _, m := range i.Indexer.List() { - namespace := m.(*api.Namespace) - if selector.Matches(labels.Set(namespace.Labels)) { - namespaces = append(namespaces, namespace) - } - } - - return namespaces, nil -} - type StoreToPodDisruptionBudgetLister struct { Store } diff --git a/pkg/client/cache/listers_core.go b/pkg/client/cache/listers_core.go index 063f61e8827..1e8eb6055e8 100644 --- a/pkg/client/cache/listers_core.go +++ b/pkg/client/cache/listers_core.go @@ -322,3 +322,27 @@ func (s storePersistentVolumeClaimsNamespacer) Get(name string) (*api.Persistent } return obj.(*api.PersistentVolumeClaim), nil } + +// IndexerToNamespaceLister gives an Indexer List method +type IndexerToNamespaceLister struct { + Indexer +} + +// List returns a list of namespaces +func (i *IndexerToNamespaceLister) List(selector labels.Selector) (ret []*api.Namespace, err error) { + err = ListAll(i.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*api.Namespace)) + }) + return ret, err +} + +func (i *IndexerToNamespaceLister) Get(name string) (*api.Namespace, error) { + obj, exists, err := i.Indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(api.Resource("namespace"), name) + } + return obj.(*api.Namespace), nil +} diff --git a/pkg/controller/serviceaccount/BUILD b/pkg/controller/serviceaccount/BUILD index fd711628684..fafb2f7fb17 100644 --- a/pkg/controller/serviceaccount/BUILD +++ b/pkg/controller/serviceaccount/BUILD @@ -26,6 +26,7 @@ go_library( "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/retry:go_default_library", + "//pkg/controller/informers:go_default_library", "//pkg/fields:go_default_library", "//pkg/registry/core/secret:go_default_library", "//pkg/registry/core/secret/etcd:go_default_library", @@ -59,8 +60,11 @@ go_test( "//pkg/api:go_default_library", "//pkg/api/errors:go_default_library", "//pkg/api/unversioned:go_default_library", + "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/client/testing/core:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/informers:go_default_library", "//pkg/runtime:go_default_library", "//pkg/util/rand:go_default_library", "//pkg/util/sets:go_default_library", diff --git a/pkg/controller/serviceaccount/serviceaccounts_controller.go b/pkg/controller/serviceaccount/serviceaccounts_controller.go index b54b04dbc33..e94bf891e69 100644 --- a/pkg/controller/serviceaccount/serviceaccounts_controller.go +++ b/pkg/controller/serviceaccount/serviceaccounts_controller.go @@ -26,10 +26,12 @@ import ( "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/controller/informers" + utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/metrics" - "k8s.io/kubernetes/pkg/watch" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" ) // nameIndexFunc is an index function that indexes based on an object's name @@ -66,193 +68,159 @@ func DefaultServiceAccountsControllerOptions() ServiceAccountsControllerOptions } // NewServiceAccountsController returns a new *ServiceAccountsController. -func NewServiceAccountsController(cl clientset.Interface, options ServiceAccountsControllerOptions) *ServiceAccountsController { +func NewServiceAccountsController(saInformer informers.ServiceAccountInformer, nsInformer informers.NamespaceInformer, cl clientset.Interface, options ServiceAccountsControllerOptions) *ServiceAccountsController { e := &ServiceAccountsController{ client: cl, serviceAccountsToEnsure: options.ServiceAccounts, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount"), } if cl != nil && cl.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().RESTClient().GetRateLimiter()) } - accountSelector := fields.Everything() - if len(options.ServiceAccounts) == 1 { - // If we're maintaining a single account, we can scope the accounts we watch to just that name - accountSelector = fields.SelectorFromSet(map[string]string{api.ObjectNameField: options.ServiceAccounts[0].Name}) - } - e.serviceAccounts, e.serviceAccountController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - options.FieldSelector = accountSelector - return e.client.Core().ServiceAccounts(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - options.FieldSelector = accountSelector - return e.client.Core().ServiceAccounts(api.NamespaceAll).Watch(options) - }, - }, - &api.ServiceAccount{}, - options.ServiceAccountResync, - cache.ResourceEventHandlerFuncs{ - DeleteFunc: e.serviceAccountDeleted, - }, - cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}, - ) - e.namespaces, e.namespaceController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return e.client.Core().Namespaces().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return e.client.Core().Namespaces().Watch(options) - }, - }, - &api.Namespace{}, - options.NamespaceResync, - cache.ResourceEventHandlerFuncs{ - AddFunc: e.namespaceAdded, - UpdateFunc: e.namespaceUpdated, - }, - cache.Indexers{"name": nameIndexFunc}, - ) + saInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: e.serviceAccountDeleted, + }) + nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: e.namespaceAdded, + UpdateFunc: e.namespaceUpdated, + }) + + e.saSynced = saInformer.Informer().HasSynced + e.saLister = saInformer.Lister() + e.nsSynced = nsInformer.Informer().HasSynced + e.nsLister = nsInformer.Lister() + + e.syncHandler = e.syncNamespace return e } // ServiceAccountsController manages ServiceAccount objects inside Namespaces type ServiceAccountsController struct { - stopChan chan struct{} - client clientset.Interface serviceAccountsToEnsure []api.ServiceAccount - serviceAccounts cache.Indexer - namespaces cache.Indexer + // To allow injection for testing. + syncHandler func(key string) error - // Since we join two objects, we'll watch both of them with controllers. - serviceAccountController *cache.Controller - namespaceController *cache.Controller + saLister *cache.StoreToServiceAccountLister + nsLister *cache.IndexerToNamespaceLister + + saSynced cache.InformerSynced + nsSynced cache.InformerSynced + + queue workqueue.RateLimitingInterface } -// Runs controller loops and returns immediately -func (e *ServiceAccountsController) Run() { - if e.stopChan == nil { - e.stopChan = make(chan struct{}) - go e.serviceAccountController.Run(e.stopChan) - go e.namespaceController.Run(e.stopChan) - } -} +func (c *ServiceAccountsController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() -// Stop gracefully shuts down this controller -func (e *ServiceAccountsController) Stop() { - if e.stopChan != nil { - close(e.stopChan) - e.stopChan = nil + glog.Infof("Starting ServiceAccount controller") + + if !cache.WaitForCacheSync(stopCh, c.saSynced) { + return } + + for i := 0; i < workers; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-stopCh + glog.Infof("Shutting down ServiceAccount controller") } // serviceAccountDeleted reacts to a ServiceAccount deletion by recreating a default ServiceAccount in the namespace if needed -func (e *ServiceAccountsController) serviceAccountDeleted(obj interface{}) { - serviceAccount, ok := obj.(*api.ServiceAccount) +func (c *ServiceAccountsController) serviceAccountDeleted(obj interface{}) { + sa, ok := obj.(*api.ServiceAccount) if !ok { - // Unknown type. If we missed a ServiceAccount deletion, the - // corresponding secrets will be cleaned up during the Secret re-list - return - } - // If the deleted service account is one we're maintaining, recreate it - for _, sa := range e.serviceAccountsToEnsure { - if sa.Name == serviceAccount.Name { - e.createServiceAccountIfNeeded(sa, serviceAccount.Namespace) + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + sa, ok = tombstone.Obj.(*api.ServiceAccount) + if !ok { + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ServiceAccount %#v", obj)) + return } } + c.queue.Add(sa.Namespace) } // namespaceAdded reacts to a Namespace creation by creating a default ServiceAccount object -func (e *ServiceAccountsController) namespaceAdded(obj interface{}) { +func (c *ServiceAccountsController) namespaceAdded(obj interface{}) { namespace := obj.(*api.Namespace) - for _, sa := range e.serviceAccountsToEnsure { - e.createServiceAccountIfNeeded(sa, namespace.Name) - } + c.queue.Add(namespace.Name) } // namespaceUpdated reacts to a Namespace update (or re-list) by creating a default ServiceAccount in the namespace if needed -func (e *ServiceAccountsController) namespaceUpdated(oldObj interface{}, newObj interface{}) { +func (c *ServiceAccountsController) namespaceUpdated(oldObj interface{}, newObj interface{}) { newNamespace := newObj.(*api.Namespace) - for _, sa := range e.serviceAccountsToEnsure { - e.createServiceAccountIfNeeded(sa, newNamespace.Name) + c.queue.Add(newNamespace.Name) +} + +func (c *ServiceAccountsController) runWorker() { + for c.processNextWorkItem() { } } -// createServiceAccountIfNeeded creates a ServiceAccount with the given name in the given namespace if: -// * the named ServiceAccount does not already exist -// * the specified namespace exists -// * the specified namespace is in the ACTIVE phase -func (e *ServiceAccountsController) createServiceAccountIfNeeded(sa api.ServiceAccount, namespace string) { - existingServiceAccount, err := e.getServiceAccount(sa.Name, namespace) - if err != nil { - glog.Error(err) - return +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (c *ServiceAccountsController) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false } - if existingServiceAccount != nil { - // If service account already exists, it doesn't need to be created - return + defer c.queue.Done(key) + + err := c.syncHandler(key.(string)) + if err == nil { + c.queue.Forget(key) + return true } - namespaceObj, err := e.getNamespace(namespace) + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + c.queue.AddRateLimited(key) + + return true +} +func (c *ServiceAccountsController) syncNamespace(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime)) + }() + + ns, err := c.nsLister.Get(key) + if apierrs.IsNotFound(err) { + return nil + } if err != nil { - glog.Error(err) - return + return err } - if namespaceObj == nil { - // If namespace does not exist, no service account is needed - return - } - if namespaceObj.Status.Phase != api.NamespaceActive { + if ns.Status.Phase != api.NamespaceActive { // If namespace is not active, we shouldn't try to create anything - return + return nil } - e.createServiceAccount(sa, namespace) -} + createFailures := []error{} + for i := range c.serviceAccountsToEnsure { + sa := c.serviceAccountsToEnsure[i] + switch _, err := c.saLister.ServiceAccounts(ns.Name).Get(sa.Name); { + case err == nil: + continue + case apierrs.IsNotFound(err): + case err != nil: + return err + } + // this is only safe because we never read it and we always write it + // TODO eliminate this once the fake client can handle creation without NS + sa.Namespace = ns.Name -// createDefaultServiceAccount creates a default ServiceAccount in the specified namespace -func (e *ServiceAccountsController) createServiceAccount(sa api.ServiceAccount, namespace string) { - sa.Namespace = namespace - if _, err := e.client.Core().ServiceAccounts(namespace).Create(&sa); err != nil && !apierrs.IsAlreadyExists(err) { - glog.Error(err) - } -} - -// getServiceAccount returns the ServiceAccount with the given name for the given namespace -func (e *ServiceAccountsController) getServiceAccount(name, namespace string) (*api.ServiceAccount, error) { - key := &api.ServiceAccount{ObjectMeta: api.ObjectMeta{Namespace: namespace}} - accounts, err := e.serviceAccounts.Index("namespace", key) - if err != nil { - return nil, err - } - - for _, obj := range accounts { - serviceAccount := obj.(*api.ServiceAccount) - if name == serviceAccount.Name { - return serviceAccount, nil + if _, err := c.client.Core().ServiceAccounts(ns.Name).Create(&sa); err != nil && !apierrs.IsAlreadyExists(err) { + createFailures = append(createFailures, err) } } - return nil, nil -} -// getNamespace returns the Namespace with the given name -func (e *ServiceAccountsController) getNamespace(name string) (*api.Namespace, error) { - key := &api.Namespace{ObjectMeta: api.ObjectMeta{Name: name}} - namespaces, err := e.namespaces.Index("name", key) - if err != nil { - return nil, err - } - - if len(namespaces) == 0 { - return nil, nil - } - if len(namespaces) == 1 { - return namespaces[0].(*api.Namespace), nil - } - return nil, fmt.Errorf("%d namespaces with the name %s indexed", len(namespaces), name) + return utilerrors.Flatten(utilerrors.NewAggregate(createFailures)) } diff --git a/pkg/controller/serviceaccount/serviceaccounts_controller_test.go b/pkg/controller/serviceaccount/serviceaccounts_controller_test.go index b9c7b0d0cba..e6a31fd2b76 100644 --- a/pkg/controller/serviceaccount/serviceaccounts_controller_test.go +++ b/pkg/controller/serviceaccount/serviceaccounts_controller_test.go @@ -18,10 +18,14 @@ package serviceaccount import ( "testing" + "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/util/sets" ) @@ -128,6 +132,7 @@ func TestServiceAccountCreation(t *testing.T) { ExpectCreatedServiceAccounts: []string{}, }, "deleted serviceaccount with active namespace": { + ExistingServiceAccounts: []*api.ServiceAccount{managedServiceAccount}, ExistingNamespace: activeNS, DeletedServiceAccount: defaultServiceAccount, ExpectCreatedServiceAccounts: []string{defaultName}, @@ -138,6 +143,7 @@ func TestServiceAccountCreation(t *testing.T) { ExpectCreatedServiceAccounts: []string{}, }, "deleted unmanaged serviceaccount with active namespace": { + ExistingServiceAccounts: []*api.ServiceAccount{defaultServiceAccount, managedServiceAccount}, ExistingNamespace: activeNS, DeletedServiceAccount: unmanagedServiceAccount, ExpectCreatedServiceAccounts: []string{}, @@ -151,32 +157,58 @@ func TestServiceAccountCreation(t *testing.T) { for k, tc := range testcases { client := fake.NewSimpleClientset(defaultServiceAccount, managedServiceAccount) + informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc()) options := DefaultServiceAccountsControllerOptions() options.ServiceAccounts = []api.ServiceAccount{ {ObjectMeta: api.ObjectMeta{Name: defaultName}}, {ObjectMeta: api.ObjectMeta{Name: managedName}}, } - controller := NewServiceAccountsController(client, options) + controller := NewServiceAccountsController(informers.ServiceAccounts(), informers.Namespaces(), client, options) + controller.saLister = &cache.StoreToServiceAccountLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + controller.nsLister = &cache.IndexerToNamespaceLister{Indexer: cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})} + controller.saSynced = alwaysReady + controller.nsSynced = alwaysReady + + syncCalls := make(chan struct{}) + controller.syncHandler = func(key string) error { + err := controller.syncNamespace(key) + if err != nil { + t.Logf("%s: %v", k, err) + } + + syncCalls <- struct{}{} + return err + } + stopCh := make(chan struct{}) + defer close(stopCh) + go controller.Run(1, stopCh) if tc.ExistingNamespace != nil { - controller.namespaces.Add(tc.ExistingNamespace) + controller.nsLister.Add(tc.ExistingNamespace) } for _, s := range tc.ExistingServiceAccounts { - controller.serviceAccounts.Add(s) + controller.saLister.Indexer.Add(s) } if tc.AddedNamespace != nil { - controller.namespaces.Add(tc.AddedNamespace) + controller.nsLister.Add(tc.AddedNamespace) controller.namespaceAdded(tc.AddedNamespace) } if tc.UpdatedNamespace != nil { - controller.namespaces.Add(tc.UpdatedNamespace) + controller.nsLister.Add(tc.UpdatedNamespace) controller.namespaceUpdated(nil, tc.UpdatedNamespace) } if tc.DeletedServiceAccount != nil { controller.serviceAccountDeleted(tc.DeletedServiceAccount) } + // wait to be called + select { + case <-syncCalls: + case <-time.After(10 * time.Second): + t.Errorf("%s: took too long", k) + } + actions := client.Actions() if len(tc.ExpectCreatedServiceAccounts) != len(actions) { t.Errorf("%s: Expected to create accounts %#v. Actual actions were: %#v", k, tc.ExpectCreatedServiceAccounts, actions) @@ -195,3 +227,5 @@ func TestServiceAccountCreation(t *testing.T) { } } } + +var alwaysReady = func() bool { return true } diff --git a/test/integration/serviceaccount/service_account_test.go b/test/integration/serviceaccount/service_account_test.go index 9ae95a3279d..79d7a95092a 100644 --- a/test/integration/serviceaccount/service_account_test.go +++ b/test/integration/serviceaccount/service_account_test.go @@ -40,6 +40,8 @@ import ( "k8s.io/kubernetes/pkg/auth/user" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/util/sets" @@ -416,14 +418,16 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie stopCh := make(chan struct{}) tokenController := serviceaccountcontroller.NewTokensController(rootClientset, serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)}) go tokenController.Run(1, stopCh) - serviceAccountController := serviceaccountcontroller.NewServiceAccountsController(rootClientset, serviceaccountcontroller.DefaultServiceAccountsControllerOptions()) - serviceAccountController.Run() + + informers := informers.NewSharedInformerFactory(rootClientset, controller.NoResyncPeriodFunc()) + serviceAccountController := serviceaccountcontroller.NewServiceAccountsController(informers.ServiceAccounts(), informers.Namespaces(), rootClientset, serviceaccountcontroller.DefaultServiceAccountsControllerOptions()) + informers.Start(stopCh) + go serviceAccountController.Run(5, stopCh) // Start the admission plugin reflectors serviceAccountAdmission.Run() stop := func() { close(stopCh) - serviceAccountController.Stop() serviceAccountAdmission.Stop() apiServer.Close() }