diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 9f347136ee0..a6c370bbd85 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -397,14 +397,20 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData } - go serviceaccountcontroller.NewTokensController( + controller := serviceaccountcontroller.NewTokensController( + sharedInformers.Core().V1().ServiceAccounts(), + sharedInformers.Core().V1().Secrets(), rootClientBuilder.ClientOrDie("tokens-controller"), serviceaccountcontroller.TokensControllerOptions{ TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey), RootCA: rootCA, }, - ).Run(int(s.ConcurrentSATokenSyncs), stop) + ) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + go controller.Run(int(s.ConcurrentSATokenSyncs), stop) + + // start the first set of informers now so that other controllers can start + sharedInformers.Start(stop) } } else { diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index 8850628523e..71df8e2eaee 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -24,20 +24,19 @@ import ( "github.com/golang/glog" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + listersv1 "k8s.io/kubernetes/pkg/client/listers/core/v1" clientretry "k8s.io/kubernetes/pkg/client/retry" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/registry/core/secret" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/util/metrics" @@ -71,7 +70,7 @@ type TokensControllerOptions struct { } // NewTokensController returns a new *TokensController. -func NewTokensController(cl clientset.Interface, options TokensControllerOptions) *TokensController { +func NewTokensController(serviceAccounts informers.ServiceAccountInformer, secrets informers.SecretInformer, cl clientset.Interface, options TokensControllerOptions) *TokensController { maxRetries := options.MaxRetries if maxRetries == 0 { maxRetries = 10 @@ -91,44 +90,38 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().RESTClient().GetRateLimiter()) } - e.serviceAccounts, e.serviceAccountController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return e.client.Core().ServiceAccounts(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return e.client.Core().ServiceAccounts(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.ServiceAccount{}, - options.ServiceAccountResync, + e.serviceAccounts = serviceAccounts.Lister() + e.serviceAccountSynced = serviceAccounts.Informer().HasSynced + serviceAccounts.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: e.queueServiceAccountSync, UpdateFunc: e.queueServiceAccountUpdateSync, DeleteFunc: e.queueServiceAccountSync, }, + options.ServiceAccountResync, ) - tokenSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(v1.SecretTypeServiceAccountToken)}) - e.secrets, e.secretController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = tokenSelector.String() - return e.client.Core().Secrets(metav1.NamespaceAll).List(options) + secretCache := secrets.Informer().GetIndexer() + e.updatedSecrets = cache.NewIntegerResourceVersionMutationCache(secretCache, secretCache, 60*time.Second, true) + e.secretSynced = secrets.Informer().HasSynced + secrets.Informer().AddEventHandlerWithResyncPeriod( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Secret: + return t.Type == v1.SecretTypeServiceAccountToken + default: + utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj)) + return false + } }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = tokenSelector.String() - return e.client.Core().Secrets(metav1.NamespaceAll).Watch(options) + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: e.queueSecretSync, + UpdateFunc: e.queueSecretUpdateSync, + DeleteFunc: e.queueSecretSync, }, }, - &v1.Secret{}, options.SecretResync, - cache.ResourceEventHandlerFuncs{ - AddFunc: e.queueSecretSync, - UpdateFunc: e.queueSecretUpdateSync, - DeleteFunc: e.queueSecretSync, - }, - cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}, ) return e @@ -141,12 +134,15 @@ type TokensController struct { rootCA []byte - serviceAccounts cache.Store - secrets cache.Indexer + serviceAccounts listersv1.ServiceAccountLister + // updatedSecrets is a wrapper around the shared cache which allows us to record + // and return our local mutations (since we're very likely to act on an updated + // secret before the watch reports it). + updatedSecrets cache.MutationCache // Since we join two objects, we'll watch both of them with controllers. - serviceAccountController cache.Controller - secretController cache.Controller + serviceAccountSynced cache.InformerSynced + secretSynced cache.InformerSynced // syncServiceAccountQueue handles service account events: // * ensures a referenced token exists for service accounts which still exist @@ -166,29 +162,22 @@ type TokensController struct { // Runs controller blocks until stopCh is closed func (e *TokensController) Run(workers int, stopCh <-chan struct{}) { + // Shut down queues defer utilruntime.HandleCrash() + defer e.syncServiceAccountQueue.ShutDown() + defer e.syncSecretQueue.ShutDown() - // Start controllers (to fill stores, call informers, fill work queues) - go e.serviceAccountController.Run(stopCh) - go e.secretController.Run(stopCh) - - // Wait for stores to fill - for !e.serviceAccountController.HasSynced() || !e.secretController.HasSynced() { - time.Sleep(100 * time.Millisecond) + if !controller.WaitForCacheSync("tokens", stopCh, e.serviceAccountSynced, e.secretSynced) { + return } - // Spawn workers to process work queues + glog.V(5).Infof("Starting workers") for i := 0; i < workers; i++ { go wait.Until(e.syncServiceAccount, 0, stopCh) go wait.Until(e.syncSecret, 0, stopCh) } - - // Block until stop channel is closed <-stopCh - - // Shut down queues - e.syncServiceAccountQueue.ShutDown() - e.syncSecretQueue.ShutDown() + glog.V(1).Infof("Shutting down") } func (e *TokensController) queueServiceAccountSync(obj interface{}) { @@ -423,7 +412,7 @@ func (e *TokensController) ensureReferencedToken(serviceAccount *v1.ServiceAccou } // Manually add the new token to the cache store. // This prevents the service account update (below) triggering another token creation, if the referenced token couldn't be found in the store - e.secrets.Add(createdToken) + e.updatedSecrets.Mutation(createdToken) // Try to add a reference to the newly created token to the service account addedReference := false @@ -626,15 +615,11 @@ func (e *TokensController) removeSecretReference(saNamespace string, saName stri func (e *TokensController) getServiceAccount(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*v1.ServiceAccount, error) { // Look up in cache - obj, exists, err := e.serviceAccounts.GetByKey(makeCacheKey(ns, name)) - if err != nil { + sa, err := e.serviceAccounts.ServiceAccounts(ns).Get(name) + if err != nil && !apierrors.IsNotFound(err) { return nil, err } - if exists { - sa, ok := obj.(*v1.ServiceAccount) - if !ok { - return nil, fmt.Errorf("expected *v1.ServiceAccount, got %#v", sa) - } + if sa != nil { // Ensure UID matches if given if len(uid) == 0 || uid == sa.UID { return sa, nil @@ -646,7 +631,7 @@ func (e *TokensController) getServiceAccount(ns string, name string, uid types.U } // Live lookup - sa, err := e.client.Core().ServiceAccounts(ns).Get(name, metav1.GetOptions{}) + sa, err = e.client.Core().ServiceAccounts(ns).Get(name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return nil, nil } @@ -662,7 +647,7 @@ func (e *TokensController) getServiceAccount(ns string, name string, uid types.U func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*v1.Secret, error) { // Look up in cache - obj, exists, err := e.secrets.GetByKey(makeCacheKey(ns, name)) + obj, exists, err := e.updatedSecrets.GetByKey(makeCacheKey(ns, name)) if err != nil { return nil, err } @@ -699,7 +684,7 @@ func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetc // listTokenSecrets returns a list of all of the ServiceAccountToken secrets that // reference the given service account's name and uid func (e *TokensController) listTokenSecrets(serviceAccount *v1.ServiceAccount) ([]*v1.Secret, error) { - namespaceSecrets, err := e.secrets.ByIndex("namespace", serviceAccount.Namespace) + namespaceSecrets, err := e.updatedSecrets.ByIndex("namespace", serviceAccount.Namespace) if err != nil { return nil, err } diff --git a/pkg/controller/serviceaccount/tokens_controller_test.go b/pkg/controller/serviceaccount/tokens_controller_test.go index 06b9e375b40..66308cb79d0 100644 --- a/pkg/controller/serviceaccount/tokens_controller_test.go +++ b/pkg/controller/serviceaccount/tokens_controller_test.go @@ -34,6 +34,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + "k8s.io/kubernetes/pkg/controller" ) type testGenerator struct { @@ -220,6 +222,7 @@ func TestTokenCreation(t *testing.T) { UpdatedServiceAccount *v1.ServiceAccount DeletedServiceAccount *v1.ServiceAccount AddedSecret *v1.Secret + AddedSecretLocal *v1.Secret UpdatedSecret *v1.Secret DeletedSecret *v1.Secret @@ -306,6 +309,13 @@ func TestTokenCreation(t *testing.T) { core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "serviceaccounts"}, metav1.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))), }, }, + "new serviceaccount with missing secrets and a local secret in the cache": { + ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences())}, + + AddedServiceAccount: serviceAccount(tokenSecretReferences()), + AddedSecretLocal: serviceAccountTokenSecret(), + ExpectedActions: []core.Action{}, + }, "new serviceaccount with non-token secrets": { ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), opaqueSecret()}, @@ -572,38 +582,44 @@ func TestTokenCreation(t *testing.T) { for _, reactor := range tc.Reactors { client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactor(t)) } - - controller := NewTokensController(client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data"), MaxRetries: tc.MaxRetries}) + informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + secretInformer := informers.Core().V1().Secrets().Informer() + secrets := secretInformer.GetStore() + serviceAccounts := informers.Core().V1().ServiceAccounts().Informer().GetStore() + controller := NewTokensController(informers.Core().V1().ServiceAccounts(), informers.Core().V1().Secrets(), client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data"), MaxRetries: tc.MaxRetries}) if tc.ExistingServiceAccount != nil { - controller.serviceAccounts.Add(tc.ExistingServiceAccount) + serviceAccounts.Add(tc.ExistingServiceAccount) } for _, s := range tc.ExistingSecrets { - controller.secrets.Add(s) + secrets.Add(s) } if tc.AddedServiceAccount != nil { - controller.serviceAccounts.Add(tc.AddedServiceAccount) + serviceAccounts.Add(tc.AddedServiceAccount) controller.queueServiceAccountSync(tc.AddedServiceAccount) } if tc.UpdatedServiceAccount != nil { - controller.serviceAccounts.Add(tc.UpdatedServiceAccount) + serviceAccounts.Add(tc.UpdatedServiceAccount) controller.queueServiceAccountUpdateSync(nil, tc.UpdatedServiceAccount) } if tc.DeletedServiceAccount != nil { - controller.serviceAccounts.Delete(tc.DeletedServiceAccount) + serviceAccounts.Delete(tc.DeletedServiceAccount) controller.queueServiceAccountSync(tc.DeletedServiceAccount) } if tc.AddedSecret != nil { - controller.secrets.Add(tc.AddedSecret) + secrets.Add(tc.AddedSecret) controller.queueSecretSync(tc.AddedSecret) } + if tc.AddedSecretLocal != nil { + controller.updatedSecrets.Mutation(tc.AddedSecretLocal) + } if tc.UpdatedSecret != nil { - controller.secrets.Add(tc.UpdatedSecret) + secrets.Add(tc.UpdatedSecret) controller.queueSecretUpdateSync(nil, tc.UpdatedSecret) } if tc.DeletedSecret != nil { - controller.secrets.Delete(tc.DeletedSecret) + secrets.Delete(tc.DeletedSecret) controller.queueSecretSync(tc.DeletedSecret) } diff --git a/test/integration/serviceaccount/service_account_test.go b/test/integration/serviceaccount/service_account_test.go index 91e058e92c4..ce90fee2adf 100644 --- a/test/integration/serviceaccount/service_account_test.go +++ b/test/integration/serviceaccount/service_account_test.go @@ -408,6 +408,7 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie serviceAccountAdmission.SetInternalKubeClientSet(internalRootClientset) internalInformers := internalinformers.NewSharedInformerFactory(internalRootClientset, controller.NoResyncPeriodFunc()) serviceAccountAdmission.SetInternalKubeInformerFactory(internalInformers) + informers := informers.NewSharedInformerFactory(rootClientset, controller.NoResyncPeriodFunc()) masterConfig := framework.NewMasterConfig() masterConfig.GenericConfig.EnableIndex = true @@ -418,10 +419,14 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie // Start the service account and service account token controllers stopCh := make(chan struct{}) - tokenController := serviceaccountcontroller.NewTokensController(rootClientset, serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)}) + tokenController := serviceaccountcontroller.NewTokensController( + informers.Core().V1().ServiceAccounts(), + informers.Core().V1().Secrets(), + rootClientset, + serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)}, + ) go tokenController.Run(1, stopCh) - informers := informers.NewSharedInformerFactory(rootClientset, controller.NoResyncPeriodFunc()) serviceAccountController := serviceaccountcontroller.NewServiceAccountsController( informers.Core().V1().ServiceAccounts(), informers.Core().V1().Namespaces(),