mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Merge pull request #8494 from liggitt/gate_tokens_controller
Wait until stores are filled before processing service account token events
This commit is contained in:
commit
80c3506484
17
pkg/client/cache/reflector.go
vendored
17
pkg/client/cache/reflector.go
vendored
@ -20,6 +20,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||||
@ -53,8 +54,10 @@ type Reflector struct {
|
|||||||
resyncPeriod time.Duration
|
resyncPeriod time.Duration
|
||||||
// lastSyncResourceVersion is the resource version token last
|
// lastSyncResourceVersion is the resource version token last
|
||||||
// observed when doing a sync with the underlying store
|
// observed when doing a sync with the underlying store
|
||||||
// it is not thread safe as it is not synchronized with access to the store
|
// it is thread safe, but not synchronized with the underlying store
|
||||||
lastSyncResourceVersion string
|
lastSyncResourceVersion string
|
||||||
|
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||||
|
lastSyncResourceVersionMutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
|
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
|
||||||
@ -145,7 +148,7 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
|
|||||||
glog.Errorf("Unable to sync list result: %v", err)
|
glog.Errorf("Unable to sync list result: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.lastSyncResourceVersion = resourceVersion
|
r.setLastSyncResourceVersion(resourceVersion)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
w, err := r.listerWatcher.Watch(resourceVersion)
|
w, err := r.listerWatcher.Watch(resourceVersion)
|
||||||
@ -225,7 +228,7 @@ loop:
|
|||||||
glog.Errorf("unable to understand watch event %#v", event)
|
glog.Errorf("unable to understand watch event %#v", event)
|
||||||
}
|
}
|
||||||
*resourceVersion = meta.ResourceVersion()
|
*resourceVersion = meta.ResourceVersion()
|
||||||
r.lastSyncResourceVersion = *resourceVersion
|
r.setLastSyncResourceVersion(*resourceVersion)
|
||||||
eventCount++
|
eventCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -242,5 +245,13 @@ loop:
|
|||||||
// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
|
// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
|
||||||
// The value returned is not synchronized with access to the underlying store and is not thread-safe
|
// The value returned is not synchronized with access to the underlying store and is not thread-safe
|
||||||
func (r *Reflector) LastSyncResourceVersion() string {
|
func (r *Reflector) LastSyncResourceVersion() string {
|
||||||
|
r.lastSyncResourceVersionMutex.RLock()
|
||||||
|
defer r.lastSyncResourceVersionMutex.RUnlock()
|
||||||
return r.lastSyncResourceVersion
|
return r.lastSyncResourceVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Reflector) setLastSyncResourceVersion(v string) {
|
||||||
|
r.lastSyncResourceVersionMutex.Lock()
|
||||||
|
defer r.lastSyncResourceVersionMutex.Unlock()
|
||||||
|
r.lastSyncResourceVersion = v
|
||||||
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package framework
|
package framework
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||||
@ -61,7 +62,9 @@ type ProcessFunc func(obj interface{}) error
|
|||||||
|
|
||||||
// Controller is a generic controller framework.
|
// Controller is a generic controller framework.
|
||||||
type Controller struct {
|
type Controller struct {
|
||||||
config Config
|
config Config
|
||||||
|
reflector *cache.Reflector
|
||||||
|
reflectorMutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// New makes a new Controller from the given Config.
|
// New makes a new Controller from the given Config.
|
||||||
@ -77,16 +80,32 @@ func New(c *Config) *Controller {
|
|||||||
// Run blocks; call via go.
|
// Run blocks; call via go.
|
||||||
func (c *Controller) Run(stopCh <-chan struct{}) {
|
func (c *Controller) Run(stopCh <-chan struct{}) {
|
||||||
defer util.HandleCrash()
|
defer util.HandleCrash()
|
||||||
cache.NewReflector(
|
r := cache.NewReflector(
|
||||||
c.config.ListerWatcher,
|
c.config.ListerWatcher,
|
||||||
c.config.ObjectType,
|
c.config.ObjectType,
|
||||||
c.config.Queue,
|
c.config.Queue,
|
||||||
c.config.FullResyncPeriod,
|
c.config.FullResyncPeriod,
|
||||||
).RunUntil(stopCh)
|
)
|
||||||
|
|
||||||
|
c.reflectorMutex.Lock()
|
||||||
|
c.reflector = r
|
||||||
|
c.reflectorMutex.Unlock()
|
||||||
|
|
||||||
|
r.RunUntil(stopCh)
|
||||||
|
|
||||||
util.Until(c.processLoop, time.Second, stopCh)
|
util.Until(c.processLoop, time.Second, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true once this controller has completed an initial resource listing
|
||||||
|
func (c *Controller) HasSynced() bool {
|
||||||
|
c.reflectorMutex.RLock()
|
||||||
|
defer c.reflectorMutex.RUnlock()
|
||||||
|
if c.reflector == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return c.reflector.LastSyncResourceVersion() != ""
|
||||||
|
}
|
||||||
|
|
||||||
// processLoop drains the work queue.
|
// processLoop drains the work queue.
|
||||||
// TODO: Consider doing the processing in parallel. This will require a little thought
|
// TODO: Consider doing the processing in parallel. This will require a little thought
|
||||||
// to make sure that we don't end up processing the same object multiple times
|
// to make sure that we don't end up processing the same object multiple times
|
||||||
|
@ -214,10 +214,20 @@ func TestHammerController(t *testing.T) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if controller.HasSynced() {
|
||||||
|
t.Errorf("Expected HasSynced() to return false before we started the controller")
|
||||||
|
}
|
||||||
|
|
||||||
// Run the controller and run it until we close stop.
|
// Run the controller and run it until we close stop.
|
||||||
stop := make(chan struct{})
|
stop := make(chan struct{})
|
||||||
go controller.Run(stop)
|
go controller.Run(stop)
|
||||||
|
|
||||||
|
// Let's wait for the controller to do its initial sync
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if !controller.HasSynced() {
|
||||||
|
t.Errorf("Expected HasSynced() to return true after the initial sync")
|
||||||
|
}
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
const threads = 3
|
const threads = 3
|
||||||
wg.Add(threads)
|
wg.Add(threads)
|
||||||
|
@ -96,6 +96,9 @@ func NewTokensController(cl client.Interface, options TokensControllerOptions) *
|
|||||||
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
e.serviceAccountsSynced = e.serviceAccountController.HasSynced
|
||||||
|
e.secretsSynced = e.secretController.HasSynced
|
||||||
|
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -112,6 +115,10 @@ type TokensController struct {
|
|||||||
// Since we join two objects, we'll watch both of them with controllers.
|
// Since we join two objects, we'll watch both of them with controllers.
|
||||||
serviceAccountController *framework.Controller
|
serviceAccountController *framework.Controller
|
||||||
secretController *framework.Controller
|
secretController *framework.Controller
|
||||||
|
|
||||||
|
// These are here so tests can inject a 'return true'.
|
||||||
|
serviceAccountsSynced func() bool
|
||||||
|
secretsSynced func() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs controller loops and returns immediately
|
// Runs controller loops and returns immediately
|
||||||
@ -133,6 +140,9 @@ func (e *TokensController) Stop() {
|
|||||||
|
|
||||||
// serviceAccountAdded reacts to a ServiceAccount creation by creating a corresponding ServiceAccountToken Secret
|
// serviceAccountAdded reacts to a ServiceAccount creation by creating a corresponding ServiceAccountToken Secret
|
||||||
func (e *TokensController) serviceAccountAdded(obj interface{}) {
|
func (e *TokensController) serviceAccountAdded(obj interface{}) {
|
||||||
|
if !e.secretsSynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
serviceAccount := obj.(*api.ServiceAccount)
|
serviceAccount := obj.(*api.ServiceAccount)
|
||||||
err := e.createSecretIfNeeded(serviceAccount)
|
err := e.createSecretIfNeeded(serviceAccount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -142,6 +152,9 @@ func (e *TokensController) serviceAccountAdded(obj interface{}) {
|
|||||||
|
|
||||||
// serviceAccountUpdated reacts to a ServiceAccount update (or re-list) by ensuring a corresponding ServiceAccountToken Secret exists
|
// serviceAccountUpdated reacts to a ServiceAccount update (or re-list) by ensuring a corresponding ServiceAccountToken Secret exists
|
||||||
func (e *TokensController) serviceAccountUpdated(oldObj interface{}, newObj interface{}) {
|
func (e *TokensController) serviceAccountUpdated(oldObj interface{}, newObj interface{}) {
|
||||||
|
if !e.secretsSynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
newServiceAccount := newObj.(*api.ServiceAccount)
|
newServiceAccount := newObj.(*api.ServiceAccount)
|
||||||
err := e.createSecretIfNeeded(newServiceAccount)
|
err := e.createSecretIfNeeded(newServiceAccount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -171,6 +184,9 @@ func (e *TokensController) serviceAccountDeleted(obj interface{}) {
|
|||||||
|
|
||||||
// secretAdded reacts to a Secret create by ensuring the referenced ServiceAccount exists, and by adding a token to the secret if needed
|
// secretAdded reacts to a Secret create by ensuring the referenced ServiceAccount exists, and by adding a token to the secret if needed
|
||||||
func (e *TokensController) secretAdded(obj interface{}) {
|
func (e *TokensController) secretAdded(obj interface{}) {
|
||||||
|
if !e.serviceAccountsSynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
secret := obj.(*api.Secret)
|
secret := obj.(*api.Secret)
|
||||||
serviceAccount, err := e.getServiceAccount(secret)
|
serviceAccount, err := e.getServiceAccount(secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -188,6 +204,9 @@ func (e *TokensController) secretAdded(obj interface{}) {
|
|||||||
|
|
||||||
// secretUpdated reacts to a Secret update (or re-list) by deleting the secret (if the referenced ServiceAccount does not exist)
|
// secretUpdated reacts to a Secret update (or re-list) by deleting the secret (if the referenced ServiceAccount does not exist)
|
||||||
func (e *TokensController) secretUpdated(oldObj interface{}, newObj interface{}) {
|
func (e *TokensController) secretUpdated(oldObj interface{}, newObj interface{}) {
|
||||||
|
if !e.serviceAccountsSynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
newSecret := newObj.(*api.Secret)
|
newSecret := newObj.(*api.Secret)
|
||||||
newServiceAccount, err := e.getServiceAccount(newSecret)
|
newServiceAccount, err := e.getServiceAccount(newSecret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -335,6 +335,10 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
controller := NewTokensController(client, DefaultTokenControllerOptions(generator))
|
controller := NewTokensController(client, DefaultTokenControllerOptions(generator))
|
||||||
|
|
||||||
|
// Tell the token controller its stores have been synced
|
||||||
|
controller.serviceAccountsSynced = func() bool { return true }
|
||||||
|
controller.secretsSynced = func() bool { return true }
|
||||||
|
|
||||||
if tc.ExistingServiceAccount != nil {
|
if tc.ExistingServiceAccount != nil {
|
||||||
controller.serviceAccounts.Add(tc.ExistingServiceAccount)
|
controller.serviceAccounts.Add(tc.ExistingServiceAccount)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user