Merge pull request #23858 from liggitt/satoken-queue

Automatic merge from submit-queue

Convert service account token controller to use a work queue

Converts the service account token controller to use a work queue. This allows parallelization of token generation (useful when there are several simultaneous namespaces or service accounts being created). It also lets us requeue failures to be retried sooned than the next sync period (which can be very long).

Fixes an issue seen when a namespace is created with secrets quotaed, and the token controller tries to create a token secret prior to the quota status having been initialized. In that case, the secret is rejected at admission, and the token controller wasn't retrying until the resync period.
This commit is contained in:
k8s-merge-robot 2016-06-27 16:43:14 -07:00 committed by GitHub
commit 3e5cdd796c
10 changed files with 1401 additions and 1025 deletions

View File

@ -439,13 +439,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if err != nil {
glog.Errorf("Error reading key for service account token controller: %v", err)
} else {
serviceaccountcontroller.NewTokensController(
go serviceaccountcontroller.NewTokensController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
).Run()
).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
}

View File

@ -53,6 +53,7 @@ func NewCMServer() *CMServer {
ConcurrentResourceQuotaSyncs: 5,
ConcurrentDeploymentSyncs: 5,
ConcurrentNamespaceSyncs: 2,
ConcurrentSATokenSyncs: 5,
LookupCacheSizeForRC: 4096,
LookupCacheSizeForRS: 4096,
LookupCacheSizeForDaemonSet: 1024,
@ -108,6 +109,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.Int32Var(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
fs.Int32Var(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load")
fs.Int32Var(&s.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", s.ConcurrentNamespaceSyncs, "The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load")
fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load")
fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "The the size of lookup cache for replication controllers. Larger number = more responsive replica management, but more MEM load.")
fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "The the size of lookup cache for replicatsets. Larger number = more responsive replica management, but more MEM load.")
fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.")

View File

@ -310,13 +310,13 @@ func (s *CMServer) Run(_ []string) error {
if err != nil {
glog.Errorf("Error reading key for service account token controller: %v", err)
} else {
serviceaccountcontroller.NewTokensController(
go serviceaccountcontroller.NewTokensController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
).Run()
).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop)
}
}

View File

@ -65,6 +65,7 @@ concurrent-namespace-syncs
concurrent-replicaset-syncs
concurrent-service-syncs
concurrent-resource-quota-syncs
concurrent-serviceaccount-token-syncs
config-sync-period
configure-cbr0
configure-cloud-routes

View File

@ -70,6 +70,7 @@ func DeepCopy_componentconfig_KubeControllerManagerConfiguration(in KubeControll
out.ConcurrentDaemonSetSyncs = in.ConcurrentDaemonSetSyncs
out.ConcurrentJobSyncs = in.ConcurrentJobSyncs
out.ConcurrentNamespaceSyncs = in.ConcurrentNamespaceSyncs
out.ConcurrentSATokenSyncs = in.ConcurrentSATokenSyncs
out.LookupCacheSizeForRC = in.LookupCacheSizeForRC
out.LookupCacheSizeForRS = in.LookupCacheSizeForRS
out.LookupCacheSizeForDaemonSet = in.LookupCacheSizeForDaemonSet

File diff suppressed because it is too large Load Diff

View File

@ -471,6 +471,9 @@ type KubeControllerManagerConfiguration struct {
// concurrentNamespaceSyncs is the number of namespace objects that are
// allowed to sync concurrently.
ConcurrentNamespaceSyncs int32 `json:"concurrentNamespaceSyncs"`
// concurrentSATokenSyncs is the number of service account token syncing operations
// that will be done concurrently.
ConcurrentSATokenSyncs int32 `json:"concurrentSATokenSyncs"`
// lookupCacheSizeForRC is the size of lookup cache for replication controllers.
// Larger number = more responsive replica management, but more MEM load.
LookupCacheSizeForRC int32 `json:"lookupCacheSizeForRC"`

View File

@ -32,10 +32,13 @@ import (
"k8s.io/kubernetes/pkg/registry/secret"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
@ -58,21 +61,36 @@ type TokensControllerOptions struct {
// SecretResync is the time.Duration at which to fully re-list secrets.
// If zero, re-list will be delayed as long as possible
SecretResync time.Duration
// This CA will be added in the secretes of service accounts
// This CA will be added in the secrets of service accounts
RootCA []byte
// MaxRetries controls the maximum number of times a particular key is retried before giving up
// If zero, a default max is used
MaxRetries int
}
// NewTokensController returns a new *TokensController.
func NewTokensController(cl clientset.Interface, options TokensControllerOptions) *TokensController {
maxRetries := options.MaxRetries
if maxRetries == 0 {
maxRetries = 10
}
e := &TokensController{
client: cl,
token: options.TokenGenerator,
rootCA: options.RootCA,
syncServiceAccountQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncSecretQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
maxRetries: maxRetries,
}
if cl != nil && cl.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().GetRESTClient().GetRateLimiter())
}
e.serviceAccounts, e.serviceAccountController = framework.NewIndexerInformer(
e.serviceAccounts, e.serviceAccountController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().ServiceAccounts(api.NamespaceAll).List(options)
@ -84,11 +102,10 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions
&api.ServiceAccount{},
options.ServiceAccountResync,
framework.ResourceEventHandlerFuncs{
AddFunc: e.serviceAccountAdded,
UpdateFunc: e.serviceAccountUpdated,
DeleteFunc: e.serviceAccountDeleted,
AddFunc: e.queueServiceAccountSync,
UpdateFunc: e.queueServiceAccountUpdateSync,
DeleteFunc: e.queueServiceAccountSync,
},
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
)
tokenSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(api.SecretTypeServiceAccountToken)})
@ -106,206 +123,277 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions
&api.Secret{},
options.SecretResync,
framework.ResourceEventHandlerFuncs{
AddFunc: e.secretAdded,
UpdateFunc: e.secretUpdated,
DeleteFunc: e.secretDeleted,
AddFunc: e.queueSecretSync,
UpdateFunc: e.queueSecretUpdateSync,
DeleteFunc: e.queueSecretSync,
},
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
)
e.serviceAccountsSynced = e.serviceAccountController.HasSynced
e.secretsSynced = e.secretController.HasSynced
return e
}
// TokensController manages ServiceAccountToken secrets for ServiceAccount objects
type TokensController struct {
stopChan chan struct{}
client clientset.Interface
token serviceaccount.TokenGenerator
rootCA []byte
serviceAccounts cache.Indexer
serviceAccounts cache.Store
secrets cache.Indexer
// Since we join two objects, we'll watch both of them with controllers.
serviceAccountController *framework.Controller
secretController *framework.Controller
// These are here so tests can inject a 'return true'.
serviceAccountsSynced func() bool
secretsSynced func() bool
// syncServiceAccountQueue handles service account events:
// * ensures a referenced token exists for service accounts which still exist
// * ensures tokens are removed for service accounts which no longer exist
// key is "<namespace>/<name>/<uid>"
syncServiceAccountQueue workqueue.RateLimitingInterface
// syncSecretQueue handles secret events:
// * deletes tokens whose service account no longer exists
// * updates tokens with missing token or namespace data, or mismatched ca data
// * ensures service account secret references are removed for tokens which are deleted
// key is a secretQueueKey{}
syncSecretQueue workqueue.RateLimitingInterface
maxRetries int
}
// Runs controller loops and returns immediately
func (e *TokensController) Run() {
if e.stopChan == nil {
e.stopChan = make(chan struct{})
go e.serviceAccountController.Run(e.stopChan)
go e.secretController.Run(e.stopChan)
// Runs controller blocks until stopCh is closed
func (e *TokensController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Start controllers (to fill stores, call informers, fill work queues)
go e.serviceAccountController.Run(stopCh)
go e.secretController.Run(stopCh)
// Wait for stores to fill
for !e.serviceAccountController.HasSynced() || !e.secretController.HasSynced() {
time.Sleep(100 * time.Millisecond)
}
// Spawn workers to process work queues
for i := 0; i < workers; i++ {
go wait.Until(e.syncServiceAccount, 0, stopCh)
go wait.Until(e.syncSecret, 0, stopCh)
}
// Block until stop channel is closed
<-stopCh
// Shut down queues
e.syncServiceAccountQueue.ShutDown()
e.syncSecretQueue.ShutDown()
}
func (e *TokensController) queueServiceAccountSync(obj interface{}) {
if serviceAccount, ok := obj.(*api.ServiceAccount); ok {
e.syncServiceAccountQueue.Add(makeServiceAccountKey(serviceAccount))
}
}
// Stop gracefully shuts down this controller
func (e *TokensController) Stop() {
if e.stopChan != nil {
close(e.stopChan)
e.stopChan = nil
func (e *TokensController) queueServiceAccountUpdateSync(oldObj interface{}, newObj interface{}) {
if serviceAccount, ok := newObj.(*api.ServiceAccount); ok {
e.syncServiceAccountQueue.Add(makeServiceAccountKey(serviceAccount))
}
}
// serviceAccountAdded reacts to a ServiceAccount creation by creating a corresponding ServiceAccountToken Secret
func (e *TokensController) serviceAccountAdded(obj interface{}) {
serviceAccount := obj.(*api.ServiceAccount)
err := e.createSecretIfNeeded(serviceAccount)
if err != nil {
glog.Error(err)
}
}
// serviceAccountUpdated reacts to a ServiceAccount update (or re-list) by ensuring a corresponding ServiceAccountToken Secret exists
func (e *TokensController) serviceAccountUpdated(oldObj interface{}, newObj interface{}) {
newServiceAccount := newObj.(*api.ServiceAccount)
err := e.createSecretIfNeeded(newServiceAccount)
if err != nil {
glog.Error(err)
}
}
// serviceAccountDeleted reacts to a ServiceAccount deletion by deleting all corresponding ServiceAccountToken Secrets
func (e *TokensController) serviceAccountDeleted(obj interface{}) {
serviceAccount, ok := obj.(*api.ServiceAccount)
if !ok {
// Unknown type. If we missed a ServiceAccount deletion, the
// corresponding secrets will be cleaned up during the Secret re-list
// complete optionally requeues key, then calls queue.Done(key)
func (e *TokensController) retryOrForget(queue workqueue.RateLimitingInterface, key interface{}, requeue bool) {
if !requeue {
queue.Forget(key)
return
}
secrets, err := e.listTokenSecrets(serviceAccount)
requeueCount := queue.NumRequeues(key)
if requeueCount < e.maxRetries {
queue.AddRateLimited(key)
return
}
glog.V(4).Infof("retried %d times: %#v", requeueCount, key)
queue.Forget(key)
}
func (e *TokensController) queueSecretSync(obj interface{}) {
if secret, ok := obj.(*api.Secret); ok {
e.syncSecretQueue.Add(makeSecretQueueKey(secret))
}
}
func (e *TokensController) queueSecretUpdateSync(oldObj interface{}, newObj interface{}) {
if secret, ok := newObj.(*api.Secret); ok {
e.syncSecretQueue.Add(makeSecretQueueKey(secret))
}
}
func (e *TokensController) syncServiceAccount() {
key, quit := e.syncServiceAccountQueue.Get()
if quit {
return
}
defer e.syncServiceAccountQueue.Done(key)
retry := false
defer func() {
e.retryOrForget(e.syncServiceAccountQueue, key, retry)
}()
saInfo, err := parseServiceAccountKey(key)
if err != nil {
glog.Error(err)
return
}
for _, secret := range secrets {
glog.V(4).Infof("Deleting secret %s/%s because service account %s was deleted", secret.Namespace, secret.Name, serviceAccount.Name)
if err := e.deleteSecret(secret); err != nil {
glog.Errorf("Error deleting secret %s/%s: %v", secret.Namespace, secret.Name, err)
sa, err := e.getServiceAccount(saInfo.namespace, saInfo.name, saInfo.uid, false)
switch {
case err != nil:
glog.Error(err)
retry = true
case sa == nil:
// service account no longer exists, so delete related tokens
glog.V(4).Infof("syncServiceAccount(%s/%s), service account deleted, removing tokens", saInfo.namespace, saInfo.name)
sa = &api.ServiceAccount{ObjectMeta: api.ObjectMeta{Namespace: saInfo.namespace, Name: saInfo.name, UID: saInfo.uid}}
if retriable, err := e.deleteTokens(sa); err != nil {
glog.Errorf("error deleting serviceaccount tokens for %s/%s: %v", saInfo.namespace, saInfo.name, err)
retry = retriable
}
default:
// ensure a token exists and is referenced by this service account
if retriable, err := e.ensureReferencedToken(sa); err != nil {
glog.Errorf("error synchronizing serviceaccount %s/%s: %v", saInfo.namespace, saInfo.name, err)
retry = retriable
}
}
}
// secretAdded reacts to a Secret create by ensuring the referenced ServiceAccount exists, and by adding a token to the secret if needed
func (e *TokensController) secretAdded(obj interface{}) {
secret := obj.(*api.Secret)
serviceAccount, err := e.getServiceAccount(secret, true)
func (e *TokensController) syncSecret() {
key, quit := e.syncSecretQueue.Get()
if quit {
return
}
defer e.syncSecretQueue.Done(key)
// Track whether or not we should retry this sync
retry := false
defer func() {
e.retryOrForget(e.syncSecretQueue, key, retry)
}()
secretInfo, err := parseSecretQueueKey(key)
if err != nil {
glog.Error(err)
return
}
if serviceAccount == nil {
glog.V(2).Infof(
"Deleting new secret %s/%s because service account %s (uid=%s) was not found",
secret.Namespace, secret.Name,
secret.Annotations[api.ServiceAccountNameKey], secret.Annotations[api.ServiceAccountUIDKey])
if err := e.deleteSecret(secret); err != nil {
glog.Errorf("Error deleting secret %s/%s: %v", secret.Namespace, secret.Name, err)
}
} else {
e.generateTokenIfNeeded(serviceAccount, secret)
}
}
// secretUpdated reacts to a Secret update (or re-list) by deleting the secret (if the referenced ServiceAccount does not exist)
func (e *TokensController) secretUpdated(oldObj interface{}, newObj interface{}) {
newSecret := newObj.(*api.Secret)
newServiceAccount, err := e.getServiceAccount(newSecret, true)
if err != nil {
secret, err := e.getSecret(secretInfo.namespace, secretInfo.name, secretInfo.uid, false)
switch {
case err != nil:
glog.Error(err)
return
}
if newServiceAccount == nil {
glog.V(2).Infof(
"Deleting updated secret %s/%s because service account %s (uid=%s) was not found",
newSecret.Namespace, newSecret.Name,
newSecret.Annotations[api.ServiceAccountNameKey], newSecret.Annotations[api.ServiceAccountUIDKey])
if err := e.deleteSecret(newSecret); err != nil {
glog.Errorf("Error deleting secret %s/%s: %v", newSecret.Namespace, newSecret.Name, err)
retry = true
case secret == nil:
// If the service account exists
if sa, saErr := e.getServiceAccount(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, false); saErr == nil && sa != nil {
// secret no longer exists, so delete references to this secret from the service account
if err := client.RetryOnConflict(RemoveTokenBackoff, func() error {
return e.removeSecretReference(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, secretInfo.name)
}); err != nil {
glog.Error(err)
}
}
default:
// Ensure service account exists
sa, saErr := e.getServiceAccount(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, true)
switch {
case saErr != nil:
glog.Error(saErr)
retry = true
case sa == nil:
// Delete token
glog.V(4).Infof("syncSecret(%s/%s), service account does not exist, deleting token", secretInfo.namespace, secretInfo.name)
if retriable, err := e.deleteToken(secretInfo.namespace, secretInfo.name, secretInfo.uid); err != nil {
glog.Errorf("error deleting serviceaccount token %s/%s for service account %s: %v", secretInfo.namespace, secretInfo.name, secretInfo.saName, err)
retry = retriable
}
default:
// Update token if needed
if retriable, err := e.generateTokenIfNeeded(sa, secret); err != nil {
glog.Errorf("error populating serviceaccount token %s/%s for service account %s: %v", secretInfo.namespace, secretInfo.name, secretInfo.saName, err)
retry = retriable
}
}
} else {
e.generateTokenIfNeeded(newServiceAccount, newSecret)
}
}
// secretDeleted reacts to a Secret being deleted by removing a reference from the corresponding ServiceAccount if needed
func (e *TokensController) secretDeleted(obj interface{}) {
secret, ok := obj.(*api.Secret)
if !ok {
// Unknown type. If we missed a Secret deletion, the corresponding ServiceAccount (if it exists)
// will get a secret recreated (if needed) during the ServiceAccount re-list
return
}
serviceAccount, err := e.getServiceAccount(secret, false)
func (e *TokensController) deleteTokens(serviceAccount *api.ServiceAccount) ( /*retry*/ bool, error) {
tokens, err := e.listTokenSecrets(serviceAccount)
if err != nil {
glog.Error(err)
return
// don't retry on cache lookup errors
return false, err
}
if serviceAccount == nil {
return
}
if err := client.RetryOnConflict(RemoveTokenBackoff, func() error {
return e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name)
}); err != nil {
utilruntime.HandleError(err)
retry := false
errs := []error{}
for _, token := range tokens {
r, err := e.deleteToken(token.Namespace, token.Name, token.UID)
if err != nil {
errs = append(errs, err)
}
if r {
retry = true
}
}
return retry, utilerrors.NewAggregate(errs)
}
// createSecretIfNeeded makes sure at least one ServiceAccountToken secret exists, and is included in the serviceAccount's Secrets list
func (e *TokensController) createSecretIfNeeded(serviceAccount *api.ServiceAccount) error {
// If the service account references no secrets, short-circuit and create a new one
if len(serviceAccount.Secrets) == 0 {
return e.createSecret(serviceAccount)
func (e *TokensController) deleteToken(ns, name string, uid types.UID) ( /*retry*/ bool, error) {
var opts *api.DeleteOptions
if len(uid) > 0 {
opts = &api.DeleteOptions{Preconditions: &api.Preconditions{UID: &uid}}
}
err := e.client.Core().Secrets(ns).Delete(name, opts)
// NotFound doesn't need a retry (it's already been deleted)
// Conflict doesn't need a retry (the UID precondition failed)
if err == nil || apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
return false, nil
}
// Retry for any other error
return true, err
}
// We shouldn't try to validate secret references until the secrets store is synced
if !e.secretsSynced() {
return nil
}
// If any existing token secrets are referenced by the service account, return
allSecrets, err := e.listTokenSecrets(serviceAccount)
if err != nil {
return err
}
referencedSecrets := getSecretReferences(serviceAccount)
for _, secret := range allSecrets {
if referencedSecrets.Has(secret.Name) {
return nil
// ensureReferencedToken makes sure at least one ServiceAccountToken secret exists, and is included in the serviceAccount's Secrets list
func (e *TokensController) ensureReferencedToken(serviceAccount *api.ServiceAccount) ( /* retry */ bool, error) {
if len(serviceAccount.Secrets) > 0 {
allSecrets, err := e.listTokenSecrets(serviceAccount)
if err != nil {
// Don't retry cache lookup errors
return false, err
}
referencedSecrets := getSecretReferences(serviceAccount)
for _, secret := range allSecrets {
if referencedSecrets.Has(secret.Name) {
// A service account token already exists, and is referenced, short-circuit
return false, nil
}
}
}
// Otherwise create a new token secret
return e.createSecret(serviceAccount)
}
// createSecret creates a secret of type ServiceAccountToken for the given ServiceAccount
func (e *TokensController) createSecret(serviceAccount *api.ServiceAccount) error {
// We don't want to update the cache's copy of the service account
// so add the secret to a freshly retrieved copy of the service account
serviceAccounts := e.client.Core().ServiceAccounts(serviceAccount.Namespace)
liveServiceAccount, err := serviceAccounts.Get(serviceAccount.Name)
if err != nil {
return err
// Retry for any error other than a NotFound
return !apierrors.IsNotFound(err), err
}
if liveServiceAccount.ResourceVersion != serviceAccount.ResourceVersion {
// our view of the service account is not up to date
// we'll get notified of an update event later and get to try again
// this only prevent interactions between successive runs of this controller's event handlers, but that is useful
glog.V(2).Infof("View of ServiceAccount %s/%s is not up to date, skipping token creation", serviceAccount.Namespace, serviceAccount.Name)
return nil
glog.V(2).Infof("serviceaccount %s/%s is not up to date, skipping token creation", serviceAccount.Namespace, serviceAccount.Name)
return false, nil
}
// Build the secret
@ -325,7 +413,8 @@ func (e *TokensController) createSecret(serviceAccount *api.ServiceAccount) erro
// Generate the token
token, err := e.token.GenerateToken(*serviceAccount, *secret)
if err != nil {
return err
// retriable error
return true, err
}
secret.Data[api.ServiceAccountTokenKey] = []byte(token)
secret.Data[api.ServiceAccountNamespaceKey] = []byte(serviceAccount.Namespace)
@ -334,41 +423,39 @@ func (e *TokensController) createSecret(serviceAccount *api.ServiceAccount) erro
}
// Save the secret
if createdToken, err := e.client.Core().Secrets(serviceAccount.Namespace).Create(secret); err != nil {
return err
} else {
// Manually add the new token to the cache store.
// This prevents the service account update (below) triggering another token creation, if the referenced token couldn't be found in the store
e.secrets.Add(createdToken)
createdToken, err := e.client.Core().Secrets(serviceAccount.Namespace).Create(secret)
if err != nil {
// retriable error
return true, err
}
// Manually add the new token to the cache store.
// This prevents the service account update (below) triggering another token creation, if the referenced token couldn't be found in the store
e.secrets.Add(createdToken)
liveServiceAccount.Secrets = append(liveServiceAccount.Secrets, api.ObjectReference{Name: secret.Name})
_, err = serviceAccounts.Update(liveServiceAccount)
if err != nil {
if _, err = serviceAccounts.Update(liveServiceAccount); err != nil {
// we weren't able to use the token, try to clean it up.
glog.V(2).Infof("Deleting secret %s/%s because reference couldn't be added (%v)", secret.Namespace, secret.Name, err)
if err := e.client.Core().Secrets(secret.Namespace).Delete(secret.Name, nil); err != nil {
glog.Error(err) // if we fail, just log it
glog.V(2).Infof("deleting secret %s/%s because reference couldn't be added (%v)", secret.Namespace, secret.Name, err)
deleteOpts := &api.DeleteOptions{Preconditions: &api.Preconditions{UID: &createdToken.UID}}
if deleteErr := e.client.Core().Secrets(createdToken.Namespace).Delete(createdToken.Name, deleteOpts); deleteErr != nil {
glog.Error(deleteErr) // if we fail, just log it
}
}
if apierrors.IsConflict(err) {
// nothing to do. We got a conflict, that means that the service account was updated. We simply need to return because we'll get an update notification later
return nil
if apierrors.IsConflict(err) || apierrors.IsNotFound(err) {
// if we got a Conflict error, the service account was updated by someone else, and we'll get an update notification later
// if we got a NotFound error, the service account no longer exists, and we don't need to create a token for it
return false, nil
}
// retry in all other cases
return true, err
}
return err
// success!
return false, nil
}
// generateTokenIfNeeded populates the token data for the given Secret if not already set
func (e *TokensController) generateTokenIfNeeded(serviceAccount *api.ServiceAccount, secret *api.Secret) error {
if secret.Annotations == nil {
secret.Annotations = map[string]string{}
}
if secret.Data == nil {
secret.Data = map[string][]byte{}
}
func (e *TokensController) secretUpdateNeeded(secret *api.Secret) (bool, bool, bool) {
caData := secret.Data[api.ServiceAccountRootCAKey]
needsCA := len(e.rootCA) > 0 && bytes.Compare(caData, e.rootCA) != 0
@ -377,60 +464,103 @@ func (e *TokensController) generateTokenIfNeeded(serviceAccount *api.ServiceAcco
tokenData := secret.Data[api.ServiceAccountTokenKey]
needsToken := len(tokenData) == 0
return needsCA, needsNamespace, needsToken
}
// generateTokenIfNeeded populates the token data for the given Secret if not already set
func (e *TokensController) generateTokenIfNeeded(serviceAccount *api.ServiceAccount, cachedSecret *api.Secret) ( /* retry */ bool, error) {
// Check the cached secret to see if changes are needed
if needsCA, needsNamespace, needsToken := e.secretUpdateNeeded(cachedSecret); !needsCA && !needsToken && !needsNamespace {
return false, nil
}
// We don't want to update the cache's copy of the secret
// so add the token to a freshly retrieved copy of the secret
secrets := e.client.Core().Secrets(cachedSecret.Namespace)
liveSecret, err := secrets.Get(cachedSecret.Name)
if err != nil {
// Retry for any error other than a NotFound
return !apierrors.IsNotFound(err), err
}
if liveSecret.ResourceVersion != cachedSecret.ResourceVersion {
// our view of the secret is not up to date
// we'll get notified of an update event later and get to try again
glog.V(2).Infof("secret %s/%s is not up to date, skipping token population", liveSecret.Namespace, liveSecret.Name)
return false, nil
}
needsCA, needsNamespace, needsToken := e.secretUpdateNeeded(liveSecret)
if !needsCA && !needsToken && !needsNamespace {
return nil
return false, nil
}
if liveSecret.Annotations == nil {
liveSecret.Annotations = map[string]string{}
}
if liveSecret.Data == nil {
liveSecret.Data = map[string][]byte{}
}
// Set the CA
if needsCA {
secret.Data[api.ServiceAccountRootCAKey] = e.rootCA
liveSecret.Data[api.ServiceAccountRootCAKey] = e.rootCA
}
// Set the namespace
if needsNamespace {
secret.Data[api.ServiceAccountNamespaceKey] = []byte(secret.Namespace)
liveSecret.Data[api.ServiceAccountNamespaceKey] = []byte(liveSecret.Namespace)
}
// Generate the token
if needsToken {
token, err := e.token.GenerateToken(*serviceAccount, *secret)
token, err := e.token.GenerateToken(*serviceAccount, *liveSecret)
if err != nil {
return err
return false, err
}
secret.Data[api.ServiceAccountTokenKey] = []byte(token)
liveSecret.Data[api.ServiceAccountTokenKey] = []byte(token)
}
// Set annotations
secret.Annotations[api.ServiceAccountNameKey] = serviceAccount.Name
secret.Annotations[api.ServiceAccountUIDKey] = string(serviceAccount.UID)
liveSecret.Annotations[api.ServiceAccountNameKey] = serviceAccount.Name
liveSecret.Annotations[api.ServiceAccountUIDKey] = string(serviceAccount.UID)
// Save the secret
if _, err := e.client.Core().Secrets(secret.Namespace).Update(secret); err != nil {
return err
_, err = secrets.Update(liveSecret)
if apierrors.IsConflict(err) || apierrors.IsNotFound(err) {
// if we got a Conflict error, the secret was updated by someone else, and we'll get an update notification later
// if we got a NotFound error, the secret no longer exists, and we don't need to populate a token
return false, nil
}
return nil
if err != nil {
return true, err
}
return false, nil
}
// deleteSecret deletes the given secret
func (e *TokensController) deleteSecret(secret *api.Secret) error {
return e.client.Core().Secrets(secret.Namespace).Delete(secret.Name, nil)
}
// removeSecretReferenceIfNeeded updates the given ServiceAccount to remove a reference to the given secretName if needed.
// Returns whether an update was performed, and any error that occurred
func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.ServiceAccount, secretName string) error {
// removeSecretReference updates the given ServiceAccount to remove a reference to the given secretName if needed.
func (e *TokensController) removeSecretReference(saNamespace string, saName string, saUID types.UID, secretName string) error {
// We don't want to update the cache's copy of the service account
// so remove the secret from a freshly retrieved copy of the service account
serviceAccounts := e.client.Core().ServiceAccounts(serviceAccount.Namespace)
serviceAccount, err := serviceAccounts.Get(serviceAccount.Name)
serviceAccounts := e.client.Core().ServiceAccounts(saNamespace)
serviceAccount, err := serviceAccounts.Get(saName)
// Ignore NotFound errors when attempting to remove a reference
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// Double-check to see if the account still references the secret
// Short-circuit if the UID doesn't match
if len(saUID) > 0 && saUID != serviceAccount.UID {
return nil
}
// Short-circuit if the secret is no longer referenced
if !getSecretReferences(serviceAccount).Has(secretName) {
return nil
}
// Remove the secret
secrets := []api.ObjectReference{}
for _, s := range serviceAccount.Secrets {
if s.Name != secretName {
@ -438,59 +568,90 @@ func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.Ser
}
}
serviceAccount.Secrets = secrets
_, err = serviceAccounts.Update(serviceAccount)
if err != nil {
return err
// Ignore NotFound errors when attempting to remove a reference
if apierrors.IsNotFound(err) {
return nil
}
return nil
return err
}
// getServiceAccount returns the ServiceAccount referenced by the given secret. If the secret is not
// of type ServiceAccountToken, or if the referenced ServiceAccount does not exist, nil is returned
func (e *TokensController) getServiceAccount(secret *api.Secret, fetchOnCacheMiss bool) (*api.ServiceAccount, error) {
name, _ := serviceAccountNameAndUID(secret)
if len(name) == 0 {
return nil, nil
}
key := &api.ServiceAccount{ObjectMeta: api.ObjectMeta{Namespace: secret.Namespace}}
namespaceAccounts, err := e.serviceAccounts.Index("namespace", key)
func (e *TokensController) getServiceAccount(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*api.ServiceAccount, error) {
// Look up in cache
obj, exists, err := e.serviceAccounts.GetByKey(makeCacheKey(ns, name))
if err != nil {
return nil, err
}
for _, obj := range namespaceAccounts {
serviceAccount := obj.(*api.ServiceAccount)
if serviceaccount.IsServiceAccountToken(secret, serviceAccount) {
return serviceAccount, nil
if exists {
sa, ok := obj.(*api.ServiceAccount)
if !ok {
return nil, fmt.Errorf("expected *api.ServiceAccount, got %#v", sa)
}
// Ensure UID matches if given
if len(uid) == 0 || uid == sa.UID {
return sa, nil
}
}
if fetchOnCacheMiss {
serviceAccount, err := e.client.Core().ServiceAccounts(secret.Namespace).Get(name)
if apierrors.IsNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}
if !fetchOnCacheMiss {
return nil, nil
}
if serviceaccount.IsServiceAccountToken(secret, serviceAccount) {
return serviceAccount, nil
// Live lookup
sa, err := e.client.Core().ServiceAccounts(ns).Get(name)
if apierrors.IsNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}
// Ensure UID matches if given
if len(uid) == 0 || uid == sa.UID {
return sa, nil
}
return nil, nil
}
func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*api.Secret, error) {
// Look up in cache
obj, exists, err := e.secrets.GetByKey(makeCacheKey(ns, name))
if err != nil {
return nil, err
}
if exists {
secret, ok := obj.(*api.Secret)
if !ok {
return nil, fmt.Errorf("expected *api.Secret, got %#v", secret)
}
// Ensure UID matches if given
if len(uid) == 0 || uid == secret.UID {
return secret, nil
}
}
if !fetchOnCacheMiss {
return nil, nil
}
// Live lookup
secret, err := e.client.Core().Secrets(ns).Get(name)
if apierrors.IsNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}
// Ensure UID matches if given
if len(uid) == 0 || uid == secret.UID {
return secret, nil
}
return nil, nil
}
// listTokenSecrets returns a list of all of the ServiceAccountToken secrets that
// reference the given service account's name and uid
func (e *TokensController) listTokenSecrets(serviceAccount *api.ServiceAccount) ([]*api.Secret, error) {
key := &api.Secret{ObjectMeta: api.ObjectMeta{Namespace: serviceAccount.Namespace}}
namespaceSecrets, err := e.secrets.Index("namespace", key)
namespaceSecrets, err := e.secrets.ByIndex("namespace", serviceAccount.Namespace)
if err != nil {
return nil, err
}
@ -523,3 +684,63 @@ func getSecretReferences(serviceAccount *api.ServiceAccount) sets.String {
}
return references
}
// serviceAccountQueueKey holds information we need to sync a service account.
// It contains enough information to look up the cached service account,
// or delete owned tokens if the service account no longer exists.
type serviceAccountQueueKey struct {
namespace string
name string
uid types.UID
}
func makeServiceAccountKey(sa *api.ServiceAccount) interface{} {
return serviceAccountQueueKey{
namespace: sa.Namespace,
name: sa.Name,
uid: sa.UID,
}
}
func parseServiceAccountKey(key interface{}) (serviceAccountQueueKey, error) {
queueKey, ok := key.(serviceAccountQueueKey)
if !ok || len(queueKey.namespace) == 0 || len(queueKey.name) == 0 || len(queueKey.uid) == 0 {
return serviceAccountQueueKey{}, fmt.Errorf("invalid serviceaccount key: %#v", key)
}
return queueKey, nil
}
// secretQueueKey holds information we need to sync a service account token secret.
// It contains enough information to look up the cached service account,
// or delete the secret reference if the secret no longer exists.
type secretQueueKey struct {
namespace string
name string
uid types.UID
saName string
// optional, will be blank when syncing tokens missing the service account uid annotation
saUID types.UID
}
func makeSecretQueueKey(secret *api.Secret) interface{} {
return secretQueueKey{
namespace: secret.Namespace,
name: secret.Name,
uid: secret.UID,
saName: secret.Annotations[api.ServiceAccountNameKey],
saUID: types.UID(secret.Annotations[api.ServiceAccountUIDKey]),
}
}
func parseSecretQueueKey(key interface{}) (secretQueueKey, error) {
queueKey, ok := key.(secretQueueKey)
if !ok || len(queueKey.namespace) == 0 || len(queueKey.name) == 0 || len(queueKey.uid) == 0 || len(queueKey.saName) == 0 {
return secretQueueKey{}, fmt.Errorf("invalid secret key: %#v", key)
}
return queueKey, nil
}
// produce the same key format as cache.MetaNamespaceKeyFunc
func makeCacheKey(namespace, name string) string {
return namespace + "/" + name
}

View File

@ -17,10 +17,15 @@ limitations under the License.
package serviceaccount
import (
"errors"
"reflect"
"testing"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
@ -63,7 +68,12 @@ func tokenSecretReferences() []api.ObjectReference {
// addTokenSecretReference adds a reference to the ServiceAccountToken that will be created
func addTokenSecretReference(refs []api.ObjectReference) []api.ObjectReference {
return append(refs, api.ObjectReference{Name: "default-token-fplln"})
return addNamedTokenSecretReference(refs, "default-token-fplln")
}
// addNamedTokenSecretReference adds a reference to the named ServiceAccountToken
func addNamedTokenSecretReference(refs []api.ObjectReference, name string) []api.ObjectReference {
return append(refs, api.ObjectReference{Name: name})
}
// serviceAccount returns a service account with the given secret refs
@ -104,10 +114,15 @@ func opaqueSecret() *api.Secret {
// createdTokenSecret returns the ServiceAccountToken secret posted when creating a new token secret.
// Named "default-token-fplln", since that is the first generated name after rand.Seed(1)
func createdTokenSecret() *api.Secret {
func createdTokenSecret(overrideName ...string) *api.Secret {
return namedCreatedTokenSecret("default-token-fplln")
}
// namedTokenSecret returns the ServiceAccountToken secret posted when creating a new token secret with the given name.
func namedCreatedTokenSecret(name string) *api.Secret {
return &api.Secret{
ObjectMeta: api.ObjectMeta{
Name: "default-token-fplln",
Name: name,
Namespace: "default",
Annotations: map[string]string{
api.ServiceAccountNameKey: "default",
@ -180,12 +195,20 @@ func serviceAccountTokenSecretWithNamespaceData(data []byte) *api.Secret {
return secret
}
type reaction struct {
verb string
resource string
reactor func(t *testing.T) core.ReactionFunc
}
func TestTokenCreation(t *testing.T) {
testcases := map[string]struct {
ClientObjects []runtime.Object
SecretsSyncPending bool
ServiceAccountsSyncPending bool
IsAsync bool
MaxRetries int
Reactors []reaction
ExistingServiceAccount *api.ServiceAccount
ExistingSecrets []*api.Secret
@ -209,16 +232,66 @@ func TestTokenCreation(t *testing.T) {
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
},
},
"new serviceaccount with no secrets with unsynced secret store": {
"new serviceaccount with no secrets encountering create error": {
ClientObjects: []runtime.Object{serviceAccount(emptySecretReferences()), createdTokenSecret()},
SecretsSyncPending: true,
MaxRetries: 10,
IsAsync: true,
Reactors: []reaction{{
verb: "create",
resource: "secrets",
reactor: func(t *testing.T) core.ReactionFunc {
i := 0
return func(core.Action) (bool, runtime.Object, error) {
i++
if i < 3 {
return true, nil, apierrors.NewForbidden(api.Resource("secrets"), "foo", errors.New("No can do"))
}
return false, nil, nil
}
},
}},
AddedServiceAccount: serviceAccount(emptySecretReferences()),
ExpectedActions: []core.Action{
// Attempt 1
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, createdTokenSecret()),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
// Attempt 2
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, namedCreatedTokenSecret("default-token-gziey")),
// Attempt 3
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, namedCreatedTokenSecret("default-token-oh43e")),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addNamedTokenSecretReference(emptySecretReferences(), "default-token-oh43e"))),
},
},
"new serviceaccount with no secrets encountering unending create error": {
ClientObjects: []runtime.Object{serviceAccount(emptySecretReferences()), createdTokenSecret()},
MaxRetries: 2,
IsAsync: true,
Reactors: []reaction{{
verb: "create",
resource: "secrets",
reactor: func(t *testing.T) core.ReactionFunc {
return func(core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewForbidden(api.Resource("secrets"), "foo", errors.New("No can do"))
}
},
}},
AddedServiceAccount: serviceAccount(emptySecretReferences()),
ExpectedActions: []core.Action{
// Attempt
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, createdTokenSecret()),
// Retry 1
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, namedCreatedTokenSecret("default-token-gziey")),
// Retry 2
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, namedCreatedTokenSecret("default-token-oh43e")),
},
},
"new serviceaccount with missing secrets": {
@ -231,14 +304,6 @@ func TestTokenCreation(t *testing.T) {
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))),
},
},
"new serviceaccount with missing secrets with unsynced secret store": {
ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences()), createdTokenSecret()},
SecretsSyncPending: true,
AddedServiceAccount: serviceAccount(missingSecretReferences()),
ExpectedActions: []core.Action{},
},
"new serviceaccount with non-token secrets": {
ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), createdTokenSecret(), opaqueSecret()},
@ -275,18 +340,6 @@ func TestTokenCreation(t *testing.T) {
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
},
},
"updated serviceaccount with no secrets with unsynced secret store": {
ClientObjects: []runtime.Object{serviceAccount(emptySecretReferences()), createdTokenSecret()},
SecretsSyncPending: true,
UpdatedServiceAccount: serviceAccount(emptySecretReferences()),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, createdTokenSecret()),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
},
},
"updated serviceaccount with missing secrets": {
ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences()), createdTokenSecret()},
@ -297,14 +350,6 @@ func TestTokenCreation(t *testing.T) {
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))),
},
},
"updated serviceaccount with missing secrets with unsynced secret store": {
ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences()), createdTokenSecret()},
SecretsSyncPending: true,
UpdatedServiceAccount: serviceAccount(missingSecretReferences()),
ExpectedActions: []core.Action{},
},
"updated serviceaccount with non-token secrets": {
ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), createdTokenSecret(), opaqueSecret()},
@ -375,6 +420,7 @@ func TestTokenCreation(t *testing.T) {
AddedSecret: serviceAccountTokenSecretWithoutTokenData(),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
},
},
@ -384,6 +430,7 @@ func TestTokenCreation(t *testing.T) {
AddedSecret: serviceAccountTokenSecretWithoutCAData(),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
},
},
@ -393,6 +440,7 @@ func TestTokenCreation(t *testing.T) {
AddedSecret: serviceAccountTokenSecretWithCAData([]byte("mismatched")),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
},
},
@ -402,6 +450,7 @@ func TestTokenCreation(t *testing.T) {
AddedSecret: serviceAccountTokenSecretWithoutNamespaceData(),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
},
},
@ -436,6 +485,7 @@ func TestTokenCreation(t *testing.T) {
UpdatedSecret: serviceAccountTokenSecretWithoutTokenData(),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
},
},
@ -445,6 +495,7 @@ func TestTokenCreation(t *testing.T) {
UpdatedSecret: serviceAccountTokenSecretWithoutCAData(),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
},
},
@ -454,6 +505,7 @@ func TestTokenCreation(t *testing.T) {
UpdatedSecret: serviceAccountTokenSecretWithCAData([]byte("mismatched")),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
},
},
@ -463,6 +515,7 @@ func TestTokenCreation(t *testing.T) {
UpdatedSecret: serviceAccountTokenSecretWithoutNamespaceData(),
ExpectedActions: []core.Action{
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
},
},
@ -501,6 +554,7 @@ func TestTokenCreation(t *testing.T) {
}
for k, tc := range testcases {
glog.Infof(k)
// Re-seed to reset name generation
utilrand.Seed(1)
@ -508,12 +562,11 @@ func TestTokenCreation(t *testing.T) {
generator := &testGenerator{Token: "ABC"}
client := fake.NewSimpleClientset(tc.ClientObjects...)
for _, reactor := range tc.Reactors {
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactor(t))
}
controller := NewTokensController(client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data")})
// Tell the token controller whether its stores have been synced
controller.serviceAccountsSynced = func() bool { return !tc.ServiceAccountsSyncPending }
controller.secretsSynced = func() bool { return !tc.SecretsSyncPending }
controller := NewTokensController(client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data"), MaxRetries: tc.MaxRetries})
if tc.ExistingServiceAccount != nil {
controller.serviceAccounts.Add(tc.ExistingServiceAccount)
@ -523,22 +576,72 @@ func TestTokenCreation(t *testing.T) {
}
if tc.AddedServiceAccount != nil {
controller.serviceAccountAdded(tc.AddedServiceAccount)
controller.serviceAccounts.Add(tc.AddedServiceAccount)
controller.queueServiceAccountSync(tc.AddedServiceAccount)
}
if tc.UpdatedServiceAccount != nil {
controller.serviceAccountUpdated(nil, tc.UpdatedServiceAccount)
controller.serviceAccounts.Add(tc.UpdatedServiceAccount)
controller.queueServiceAccountUpdateSync(nil, tc.UpdatedServiceAccount)
}
if tc.DeletedServiceAccount != nil {
controller.serviceAccountDeleted(tc.DeletedServiceAccount)
controller.serviceAccounts.Delete(tc.DeletedServiceAccount)
controller.queueServiceAccountSync(tc.DeletedServiceAccount)
}
if tc.AddedSecret != nil {
controller.secretAdded(tc.AddedSecret)
controller.secrets.Add(tc.AddedSecret)
controller.queueSecretSync(tc.AddedSecret)
}
if tc.UpdatedSecret != nil {
controller.secretUpdated(nil, tc.UpdatedSecret)
controller.secrets.Add(tc.UpdatedSecret)
controller.queueSecretUpdateSync(nil, tc.UpdatedSecret)
}
if tc.DeletedSecret != nil {
controller.secretDeleted(tc.DeletedSecret)
controller.secrets.Delete(tc.DeletedSecret)
controller.queueSecretSync(tc.DeletedSecret)
}
// This is the longest we'll wait for async tests
timeout := time.Now().Add(30 * time.Second)
waitedForAdditionalActions := false
for {
if controller.syncServiceAccountQueue.Len() > 0 {
controller.syncServiceAccount()
}
if controller.syncSecretQueue.Len() > 0 {
controller.syncSecret()
}
// The queues still have things to work on
if controller.syncServiceAccountQueue.Len() > 0 || controller.syncSecretQueue.Len() > 0 {
continue
}
// If we expect this test to work asynchronously...
if tc.IsAsync {
// if we're still missing expected actions within our test timeout
if len(client.Actions()) < len(tc.ExpectedActions) && time.Now().Before(timeout) {
// wait for the expected actions (without hotlooping)
time.Sleep(time.Millisecond)
continue
}
// if we exactly match our expected actions, wait a bit to make sure no other additional actions show up
if len(client.Actions()) == len(tc.ExpectedActions) && !waitedForAdditionalActions {
time.Sleep(time.Second)
waitedForAdditionalActions = true
continue
}
}
break
}
if controller.syncServiceAccountQueue.Len() > 0 {
t.Errorf("%s: unexpected items in service account queue: %d", k, controller.syncServiceAccountQueue.Len())
}
if controller.syncSecretQueue.Len() > 0 {
t.Errorf("%s: unexpected items in secret queue: %d", k, controller.syncSecretQueue.Len())
}
actions := client.Actions()
@ -556,7 +659,10 @@ func TestTokenCreation(t *testing.T) {
}
if len(tc.ExpectedActions) > len(actions) {
t.Errorf("%s: %d additional expected actions:%+v", k, len(tc.ExpectedActions)-len(actions), tc.ExpectedActions[len(actions):])
t.Errorf("%s: %d additional expected actions", k, len(tc.ExpectedActions)-len(actions))
for _, a := range tc.ExpectedActions[len(actions):] {
t.Logf(" %+v", a)
}
}
}
}

View File

@ -415,15 +415,16 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie
}
// Start the service account and service account token controllers
stopCh := make(chan struct{})
tokenController := serviceaccountcontroller.NewTokensController(rootClientset, serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)})
tokenController.Run()
go tokenController.Run(1, stopCh)
serviceAccountController := serviceaccountcontroller.NewServiceAccountsController(rootClientset, serviceaccountcontroller.DefaultServiceAccountsControllerOptions())
serviceAccountController.Run()
// Start the admission plugin reflectors
serviceAccountAdmission.Run()
stop := func() {
tokenController.Stop()
close(stopCh)
serviceAccountController.Stop()
serviceAccountAdmission.Stop()
apiServer.Close()