From 49ceb82179a36fefe535bbf355c483312212df24 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Tue, 19 May 2015 05:24:17 -0400 Subject: [PATCH] Wait until stores are filled before processing service account token events --- pkg/client/cache/reflector.go | 17 ++++++++++--- pkg/controller/framework/controller.go | 25 +++++++++++++++++--- pkg/controller/framework/controller_test.go | 10 ++++++++ pkg/serviceaccount/tokens_controller.go | 19 +++++++++++++++ pkg/serviceaccount/tokens_controller_test.go | 4 ++++ 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 340d73a1359..e7c502f4552 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -20,6 +20,7 @@ import ( "errors" "io" "reflect" + "sync" "time" apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -53,8 +54,10 @@ type Reflector struct { resyncPeriod time.Duration // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store - // it is not thread safe as it is not synchronized with access to the store + // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string + // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion + lastSyncResourceVersionMutex sync.RWMutex } // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector @@ -145,7 +148,7 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { glog.Errorf("Unable to sync list result: %v", err) return } - r.lastSyncResourceVersion = resourceVersion + r.setLastSyncResourceVersion(resourceVersion) for { w, err := r.listerWatcher.Watch(resourceVersion) @@ -225,7 +228,7 @@ loop: glog.Errorf("unable to understand watch event %#v", event) } *resourceVersion = meta.ResourceVersion() - r.lastSyncResourceVersion = *resourceVersion + r.setLastSyncResourceVersion(*resourceVersion) eventCount++ } } @@ -242,5 +245,13 @@ loop: // LastSyncResourceVersion is the resource version observed when last sync with the underlying store // The value returned is not synchronized with access to the underlying store and is not thread-safe func (r *Reflector) LastSyncResourceVersion() string { + r.lastSyncResourceVersionMutex.RLock() + defer r.lastSyncResourceVersionMutex.RUnlock() return r.lastSyncResourceVersion } + +func (r *Reflector) setLastSyncResourceVersion(v string) { + r.lastSyncResourceVersionMutex.Lock() + defer r.lastSyncResourceVersionMutex.Unlock() + r.lastSyncResourceVersion = v +} diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index 8f6f2e81269..84d6b5ef090 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -17,6 +17,7 @@ limitations under the License. package framework import ( + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" @@ -61,7 +62,9 @@ type ProcessFunc func(obj interface{}) error // Controller is a generic controller framework. type Controller struct { - config Config + config Config + reflector *cache.Reflector + reflectorMutex sync.RWMutex } // New makes a new Controller from the given Config. @@ -77,16 +80,32 @@ func New(c *Config) *Controller { // Run blocks; call via go. func (c *Controller) Run(stopCh <-chan struct{}) { defer util.HandleCrash() - cache.NewReflector( + r := cache.NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, - ).RunUntil(stopCh) + ) + + c.reflectorMutex.Lock() + c.reflector = r + c.reflectorMutex.Unlock() + + r.RunUntil(stopCh) util.Until(c.processLoop, time.Second, stopCh) } +// Returns true once this controller has completed an initial resource listing +func (c *Controller) HasSynced() bool { + c.reflectorMutex.RLock() + defer c.reflectorMutex.RUnlock() + if c.reflector == nil { + return false + } + return c.reflector.LastSyncResourceVersion() != "" +} + // processLoop drains the work queue. // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index 95d3611fe95..9f35c23304e 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -214,10 +214,20 @@ func TestHammerController(t *testing.T) { }, ) + if controller.HasSynced() { + t.Errorf("Expected HasSynced() to return false before we started the controller") + } + // Run the controller and run it until we close stop. stop := make(chan struct{}) go controller.Run(stop) + // Let's wait for the controller to do its initial sync + time.Sleep(100 * time.Millisecond) + if !controller.HasSynced() { + t.Errorf("Expected HasSynced() to return true after the initial sync") + } + wg := sync.WaitGroup{} const threads = 3 wg.Add(threads) diff --git a/pkg/serviceaccount/tokens_controller.go b/pkg/serviceaccount/tokens_controller.go index 5c3c3fc9cdb..d6cf8bb475b 100644 --- a/pkg/serviceaccount/tokens_controller.go +++ b/pkg/serviceaccount/tokens_controller.go @@ -96,6 +96,9 @@ func NewTokensController(cl client.Interface, options TokensControllerOptions) * cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}, ) + e.serviceAccountsSynced = e.serviceAccountController.HasSynced + e.secretsSynced = e.secretController.HasSynced + return e } @@ -112,6 +115,10 @@ type TokensController struct { // Since we join two objects, we'll watch both of them with controllers. serviceAccountController *framework.Controller secretController *framework.Controller + + // These are here so tests can inject a 'return true'. + serviceAccountsSynced func() bool + secretsSynced func() bool } // Runs controller loops and returns immediately @@ -133,6 +140,9 @@ func (e *TokensController) Stop() { // serviceAccountAdded reacts to a ServiceAccount creation by creating a corresponding ServiceAccountToken Secret func (e *TokensController) serviceAccountAdded(obj interface{}) { + if !e.secretsSynced() { + return + } serviceAccount := obj.(*api.ServiceAccount) err := e.createSecretIfNeeded(serviceAccount) if err != nil { @@ -142,6 +152,9 @@ func (e *TokensController) serviceAccountAdded(obj interface{}) { // serviceAccountUpdated reacts to a ServiceAccount update (or re-list) by ensuring a corresponding ServiceAccountToken Secret exists func (e *TokensController) serviceAccountUpdated(oldObj interface{}, newObj interface{}) { + if !e.secretsSynced() { + return + } newServiceAccount := newObj.(*api.ServiceAccount) err := e.createSecretIfNeeded(newServiceAccount) if err != nil { @@ -171,6 +184,9 @@ func (e *TokensController) serviceAccountDeleted(obj interface{}) { // secretAdded reacts to a Secret create by ensuring the referenced ServiceAccount exists, and by adding a token to the secret if needed func (e *TokensController) secretAdded(obj interface{}) { + if !e.serviceAccountsSynced() { + return + } secret := obj.(*api.Secret) serviceAccount, err := e.getServiceAccount(secret) if err != nil { @@ -188,6 +204,9 @@ func (e *TokensController) secretAdded(obj interface{}) { // secretUpdated reacts to a Secret update (or re-list) by deleting the secret (if the referenced ServiceAccount does not exist) func (e *TokensController) secretUpdated(oldObj interface{}, newObj interface{}) { + if !e.serviceAccountsSynced() { + return + } newSecret := newObj.(*api.Secret) newServiceAccount, err := e.getServiceAccount(newSecret) if err != nil { diff --git a/pkg/serviceaccount/tokens_controller_test.go b/pkg/serviceaccount/tokens_controller_test.go index 041d52575c6..e34cab5809d 100644 --- a/pkg/serviceaccount/tokens_controller_test.go +++ b/pkg/serviceaccount/tokens_controller_test.go @@ -335,6 +335,10 @@ func TestTokenCreation(t *testing.T) { controller := NewTokensController(client, DefaultTokenControllerOptions(generator)) + // Tell the token controller its stores have been synced + controller.serviceAccountsSynced = func() bool { return true } + controller.secretsSynced = func() bool { return true } + if tc.ExistingServiceAccount != nil { controller.serviceAccounts.Add(tc.ExistingServiceAccount) }