mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Convert service account token controller to use a work queue
This commit is contained in:
parent
db4c943f6d
commit
f45d9dc2f8
@ -439,13 +439,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error reading key for service account token controller: %v", err)
|
glog.Errorf("Error reading key for service account token controller: %v", err)
|
||||||
} else {
|
} else {
|
||||||
serviceaccountcontroller.NewTokensController(
|
go serviceaccountcontroller.NewTokensController(
|
||||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")),
|
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")),
|
||||||
serviceaccountcontroller.TokensControllerOptions{
|
serviceaccountcontroller.TokensControllerOptions{
|
||||||
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
||||||
RootCA: rootCA,
|
RootCA: rootCA,
|
||||||
},
|
},
|
||||||
).Run()
|
).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -53,6 +53,7 @@ func NewCMServer() *CMServer {
|
|||||||
ConcurrentResourceQuotaSyncs: 5,
|
ConcurrentResourceQuotaSyncs: 5,
|
||||||
ConcurrentDeploymentSyncs: 5,
|
ConcurrentDeploymentSyncs: 5,
|
||||||
ConcurrentNamespaceSyncs: 2,
|
ConcurrentNamespaceSyncs: 2,
|
||||||
|
ConcurrentSATokenSyncs: 5,
|
||||||
LookupCacheSizeForRC: 4096,
|
LookupCacheSizeForRC: 4096,
|
||||||
LookupCacheSizeForRS: 4096,
|
LookupCacheSizeForRS: 4096,
|
||||||
LookupCacheSizeForDaemonSet: 1024,
|
LookupCacheSizeForDaemonSet: 1024,
|
||||||
@ -108,6 +109,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.Int32Var(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
|
fs.Int32Var(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
|
||||||
fs.Int32Var(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load")
|
fs.Int32Var(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load")
|
||||||
fs.Int32Var(&s.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", s.ConcurrentNamespaceSyncs, "The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load")
|
fs.Int32Var(&s.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", s.ConcurrentNamespaceSyncs, "The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load")
|
||||||
|
fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load")
|
||||||
fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "The the size of lookup cache for replication controllers. Larger number = more responsive replica management, but more MEM load.")
|
fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "The the size of lookup cache for replication controllers. Larger number = more responsive replica management, but more MEM load.")
|
||||||
fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "The the size of lookup cache for replicatsets. Larger number = more responsive replica management, but more MEM load.")
|
fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "The the size of lookup cache for replicatsets. Larger number = more responsive replica management, but more MEM load.")
|
||||||
fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.")
|
fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.")
|
||||||
|
@ -310,13 +310,13 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error reading key for service account token controller: %v", err)
|
glog.Errorf("Error reading key for service account token controller: %v", err)
|
||||||
} else {
|
} else {
|
||||||
serviceaccountcontroller.NewTokensController(
|
go serviceaccountcontroller.NewTokensController(
|
||||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")),
|
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")),
|
||||||
serviceaccountcontroller.TokensControllerOptions{
|
serviceaccountcontroller.TokensControllerOptions{
|
||||||
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
||||||
RootCA: rootCA,
|
RootCA: rootCA,
|
||||||
},
|
},
|
||||||
).Run()
|
).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,6 +65,7 @@ concurrent-namespace-syncs
|
|||||||
concurrent-replicaset-syncs
|
concurrent-replicaset-syncs
|
||||||
concurrent-service-syncs
|
concurrent-service-syncs
|
||||||
concurrent-resource-quota-syncs
|
concurrent-resource-quota-syncs
|
||||||
|
concurrent-serviceaccount-token-syncs
|
||||||
config-sync-period
|
config-sync-period
|
||||||
configure-cbr0
|
configure-cbr0
|
||||||
configure-cloud-routes
|
configure-cloud-routes
|
||||||
|
@ -70,6 +70,7 @@ func DeepCopy_componentconfig_KubeControllerManagerConfiguration(in KubeControll
|
|||||||
out.ConcurrentDaemonSetSyncs = in.ConcurrentDaemonSetSyncs
|
out.ConcurrentDaemonSetSyncs = in.ConcurrentDaemonSetSyncs
|
||||||
out.ConcurrentJobSyncs = in.ConcurrentJobSyncs
|
out.ConcurrentJobSyncs = in.ConcurrentJobSyncs
|
||||||
out.ConcurrentNamespaceSyncs = in.ConcurrentNamespaceSyncs
|
out.ConcurrentNamespaceSyncs = in.ConcurrentNamespaceSyncs
|
||||||
|
out.ConcurrentSATokenSyncs = in.ConcurrentSATokenSyncs
|
||||||
out.LookupCacheSizeForRC = in.LookupCacheSizeForRC
|
out.LookupCacheSizeForRC = in.LookupCacheSizeForRC
|
||||||
out.LookupCacheSizeForRS = in.LookupCacheSizeForRS
|
out.LookupCacheSizeForRS = in.LookupCacheSizeForRS
|
||||||
out.LookupCacheSizeForDaemonSet = in.LookupCacheSizeForDaemonSet
|
out.LookupCacheSizeForDaemonSet = in.LookupCacheSizeForDaemonSet
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -471,6 +471,9 @@ type KubeControllerManagerConfiguration struct {
|
|||||||
// concurrentNamespaceSyncs is the number of namespace objects that are
|
// concurrentNamespaceSyncs is the number of namespace objects that are
|
||||||
// allowed to sync concurrently.
|
// allowed to sync concurrently.
|
||||||
ConcurrentNamespaceSyncs int32 `json:"concurrentNamespaceSyncs"`
|
ConcurrentNamespaceSyncs int32 `json:"concurrentNamespaceSyncs"`
|
||||||
|
// concurrentSATokenSyncs is the number of service account token syncing operations
|
||||||
|
// that will be done concurrently.
|
||||||
|
ConcurrentSATokenSyncs int32 `json:"concurrentSATokenSyncs"`
|
||||||
// lookupCacheSizeForRC is the size of lookup cache for replication controllers.
|
// lookupCacheSizeForRC is the size of lookup cache for replication controllers.
|
||||||
// Larger number = more responsive replica management, but more MEM load.
|
// Larger number = more responsive replica management, but more MEM load.
|
||||||
LookupCacheSizeForRC int32 `json:"lookupCacheSizeForRC"`
|
LookupCacheSizeForRC int32 `json:"lookupCacheSizeForRC"`
|
||||||
|
@ -32,10 +32,13 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/registry/secret"
|
"k8s.io/kubernetes/pkg/registry/secret"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -58,21 +61,36 @@ type TokensControllerOptions struct {
|
|||||||
// SecretResync is the time.Duration at which to fully re-list secrets.
|
// SecretResync is the time.Duration at which to fully re-list secrets.
|
||||||
// If zero, re-list will be delayed as long as possible
|
// If zero, re-list will be delayed as long as possible
|
||||||
SecretResync time.Duration
|
SecretResync time.Duration
|
||||||
// This CA will be added in the secretes of service accounts
|
// This CA will be added in the secrets of service accounts
|
||||||
RootCA []byte
|
RootCA []byte
|
||||||
|
|
||||||
|
// MaxRetries controls the maximum number of times a particular key is retried before giving up
|
||||||
|
// If zero, a default max is used
|
||||||
|
MaxRetries int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTokensController returns a new *TokensController.
|
// NewTokensController returns a new *TokensController.
|
||||||
func NewTokensController(cl clientset.Interface, options TokensControllerOptions) *TokensController {
|
func NewTokensController(cl clientset.Interface, options TokensControllerOptions) *TokensController {
|
||||||
|
maxRetries := options.MaxRetries
|
||||||
|
if maxRetries == 0 {
|
||||||
|
maxRetries = 10
|
||||||
|
}
|
||||||
|
|
||||||
e := &TokensController{
|
e := &TokensController{
|
||||||
client: cl,
|
client: cl,
|
||||||
token: options.TokenGenerator,
|
token: options.TokenGenerator,
|
||||||
rootCA: options.RootCA,
|
rootCA: options.RootCA,
|
||||||
|
|
||||||
|
syncServiceAccountQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||||
|
syncSecretQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||||
|
|
||||||
|
maxRetries: maxRetries,
|
||||||
}
|
}
|
||||||
if cl != nil && cl.Core().GetRESTClient().GetRateLimiter() != nil {
|
if cl != nil && cl.Core().GetRESTClient().GetRateLimiter() != nil {
|
||||||
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().GetRESTClient().GetRateLimiter())
|
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().GetRESTClient().GetRateLimiter())
|
||||||
}
|
}
|
||||||
e.serviceAccounts, e.serviceAccountController = framework.NewIndexerInformer(
|
|
||||||
|
e.serviceAccounts, e.serviceAccountController = framework.NewInformer(
|
||||||
&cache.ListWatch{
|
&cache.ListWatch{
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
return e.client.Core().ServiceAccounts(api.NamespaceAll).List(options)
|
return e.client.Core().ServiceAccounts(api.NamespaceAll).List(options)
|
||||||
@ -84,11 +102,10 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions
|
|||||||
&api.ServiceAccount{},
|
&api.ServiceAccount{},
|
||||||
options.ServiceAccountResync,
|
options.ServiceAccountResync,
|
||||||
framework.ResourceEventHandlerFuncs{
|
framework.ResourceEventHandlerFuncs{
|
||||||
AddFunc: e.serviceAccountAdded,
|
AddFunc: e.queueServiceAccountSync,
|
||||||
UpdateFunc: e.serviceAccountUpdated,
|
UpdateFunc: e.queueServiceAccountUpdateSync,
|
||||||
DeleteFunc: e.serviceAccountDeleted,
|
DeleteFunc: e.queueServiceAccountSync,
|
||||||
},
|
},
|
||||||
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
tokenSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(api.SecretTypeServiceAccountToken)})
|
tokenSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(api.SecretTypeServiceAccountToken)})
|
||||||
@ -106,206 +123,277 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions
|
|||||||
&api.Secret{},
|
&api.Secret{},
|
||||||
options.SecretResync,
|
options.SecretResync,
|
||||||
framework.ResourceEventHandlerFuncs{
|
framework.ResourceEventHandlerFuncs{
|
||||||
AddFunc: e.secretAdded,
|
AddFunc: e.queueSecretSync,
|
||||||
UpdateFunc: e.secretUpdated,
|
UpdateFunc: e.queueSecretUpdateSync,
|
||||||
DeleteFunc: e.secretDeleted,
|
DeleteFunc: e.queueSecretSync,
|
||||||
},
|
},
|
||||||
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
||||||
)
|
)
|
||||||
|
|
||||||
e.serviceAccountsSynced = e.serviceAccountController.HasSynced
|
|
||||||
e.secretsSynced = e.secretController.HasSynced
|
|
||||||
|
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
// TokensController manages ServiceAccountToken secrets for ServiceAccount objects
|
// TokensController manages ServiceAccountToken secrets for ServiceAccount objects
|
||||||
type TokensController struct {
|
type TokensController struct {
|
||||||
stopChan chan struct{}
|
|
||||||
|
|
||||||
client clientset.Interface
|
client clientset.Interface
|
||||||
token serviceaccount.TokenGenerator
|
token serviceaccount.TokenGenerator
|
||||||
|
|
||||||
rootCA []byte
|
rootCA []byte
|
||||||
|
|
||||||
serviceAccounts cache.Indexer
|
serviceAccounts cache.Store
|
||||||
secrets cache.Indexer
|
secrets cache.Indexer
|
||||||
|
|
||||||
// Since we join two objects, we'll watch both of them with controllers.
|
// Since we join two objects, we'll watch both of them with controllers.
|
||||||
serviceAccountController *framework.Controller
|
serviceAccountController *framework.Controller
|
||||||
secretController *framework.Controller
|
secretController *framework.Controller
|
||||||
|
|
||||||
// These are here so tests can inject a 'return true'.
|
// syncServiceAccountQueue handles service account events:
|
||||||
serviceAccountsSynced func() bool
|
// * ensures a referenced token exists for service accounts which still exist
|
||||||
secretsSynced func() bool
|
// * ensures tokens are removed for service accounts which no longer exist
|
||||||
|
// key is "<namespace>/<name>/<uid>"
|
||||||
|
syncServiceAccountQueue workqueue.RateLimitingInterface
|
||||||
|
|
||||||
|
// syncSecretQueue handles secret events:
|
||||||
|
// * deletes tokens whose service account no longer exists
|
||||||
|
// * updates tokens with missing token or namespace data, or mismatched ca data
|
||||||
|
// * ensures service account secret references are removed for tokens which are deleted
|
||||||
|
// key is a secretQueueKey{}
|
||||||
|
syncSecretQueue workqueue.RateLimitingInterface
|
||||||
|
|
||||||
|
maxRetries int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs controller loops and returns immediately
|
// Runs controller blocks until stopCh is closed
|
||||||
func (e *TokensController) Run() {
|
func (e *TokensController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
if e.stopChan == nil {
|
defer utilruntime.HandleCrash()
|
||||||
e.stopChan = make(chan struct{})
|
|
||||||
go e.serviceAccountController.Run(e.stopChan)
|
// Start controllers (to fill stores, call informers, fill work queues)
|
||||||
go e.secretController.Run(e.stopChan)
|
go e.serviceAccountController.Run(stopCh)
|
||||||
|
go e.secretController.Run(stopCh)
|
||||||
|
|
||||||
|
// Wait for stores to fill
|
||||||
|
for !e.serviceAccountController.HasSynced() || !e.secretController.HasSynced() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn workers to process work queues
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
go wait.Until(e.syncServiceAccount, 0, stopCh)
|
||||||
|
go wait.Until(e.syncSecret, 0, stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Block until stop channel is closed
|
||||||
|
<-stopCh
|
||||||
|
|
||||||
|
// Shut down queues
|
||||||
|
e.syncServiceAccountQueue.ShutDown()
|
||||||
|
e.syncSecretQueue.ShutDown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *TokensController) queueServiceAccountSync(obj interface{}) {
|
||||||
|
if serviceAccount, ok := obj.(*api.ServiceAccount); ok {
|
||||||
|
e.syncServiceAccountQueue.Add(makeServiceAccountKey(serviceAccount))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop gracefully shuts down this controller
|
func (e *TokensController) queueServiceAccountUpdateSync(oldObj interface{}, newObj interface{}) {
|
||||||
func (e *TokensController) Stop() {
|
if serviceAccount, ok := newObj.(*api.ServiceAccount); ok {
|
||||||
if e.stopChan != nil {
|
e.syncServiceAccountQueue.Add(makeServiceAccountKey(serviceAccount))
|
||||||
close(e.stopChan)
|
|
||||||
e.stopChan = nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// serviceAccountAdded reacts to a ServiceAccount creation by creating a corresponding ServiceAccountToken Secret
|
// complete optionally requeues key, then calls queue.Done(key)
|
||||||
func (e *TokensController) serviceAccountAdded(obj interface{}) {
|
func (e *TokensController) retryOrForget(queue workqueue.RateLimitingInterface, key interface{}, requeue bool) {
|
||||||
serviceAccount := obj.(*api.ServiceAccount)
|
if !requeue {
|
||||||
err := e.createSecretIfNeeded(serviceAccount)
|
queue.Forget(key)
|
||||||
if err != nil {
|
|
||||||
glog.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// serviceAccountUpdated reacts to a ServiceAccount update (or re-list) by ensuring a corresponding ServiceAccountToken Secret exists
|
|
||||||
func (e *TokensController) serviceAccountUpdated(oldObj interface{}, newObj interface{}) {
|
|
||||||
newServiceAccount := newObj.(*api.ServiceAccount)
|
|
||||||
err := e.createSecretIfNeeded(newServiceAccount)
|
|
||||||
if err != nil {
|
|
||||||
glog.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// serviceAccountDeleted reacts to a ServiceAccount deletion by deleting all corresponding ServiceAccountToken Secrets
|
|
||||||
func (e *TokensController) serviceAccountDeleted(obj interface{}) {
|
|
||||||
serviceAccount, ok := obj.(*api.ServiceAccount)
|
|
||||||
if !ok {
|
|
||||||
// Unknown type. If we missed a ServiceAccount deletion, the
|
|
||||||
// corresponding secrets will be cleaned up during the Secret re-list
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
secrets, err := e.listTokenSecrets(serviceAccount)
|
|
||||||
|
requeueCount := queue.NumRequeues(key)
|
||||||
|
if requeueCount < e.maxRetries {
|
||||||
|
queue.AddRateLimited(key)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(4).Infof("retried %d times: %#v", requeueCount, key)
|
||||||
|
queue.Forget(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *TokensController) queueSecretSync(obj interface{}) {
|
||||||
|
if secret, ok := obj.(*api.Secret); ok {
|
||||||
|
e.syncSecretQueue.Add(makeSecretQueueKey(secret))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *TokensController) queueSecretUpdateSync(oldObj interface{}, newObj interface{}) {
|
||||||
|
if secret, ok := newObj.(*api.Secret); ok {
|
||||||
|
e.syncSecretQueue.Add(makeSecretQueueKey(secret))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *TokensController) syncServiceAccount() {
|
||||||
|
key, quit := e.syncServiceAccountQueue.Get()
|
||||||
|
if quit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer e.syncServiceAccountQueue.Done(key)
|
||||||
|
|
||||||
|
retry := false
|
||||||
|
defer func() {
|
||||||
|
e.retryOrForget(e.syncServiceAccountQueue, key, retry)
|
||||||
|
}()
|
||||||
|
|
||||||
|
saInfo, err := parseServiceAccountKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error(err)
|
glog.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, secret := range secrets {
|
|
||||||
glog.V(4).Infof("Deleting secret %s/%s because service account %s was deleted", secret.Namespace, secret.Name, serviceAccount.Name)
|
sa, err := e.getServiceAccount(saInfo.namespace, saInfo.name, saInfo.uid, false)
|
||||||
if err := e.deleteSecret(secret); err != nil {
|
switch {
|
||||||
glog.Errorf("Error deleting secret %s/%s: %v", secret.Namespace, secret.Name, err)
|
case err != nil:
|
||||||
|
glog.Error(err)
|
||||||
|
retry = true
|
||||||
|
case sa == nil:
|
||||||
|
// service account no longer exists, so delete related tokens
|
||||||
|
glog.V(4).Infof("syncServiceAccount(%s/%s), service account deleted, removing tokens", saInfo.namespace, saInfo.name)
|
||||||
|
sa = &api.ServiceAccount{ObjectMeta: api.ObjectMeta{Namespace: saInfo.namespace, Name: saInfo.name, UID: saInfo.uid}}
|
||||||
|
if retriable, err := e.deleteTokens(sa); err != nil {
|
||||||
|
glog.Errorf("error deleting serviceaccount tokens for %s/%s: %v", saInfo.namespace, saInfo.name, err)
|
||||||
|
retry = retriable
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// ensure a token exists and is referenced by this service account
|
||||||
|
if retriable, err := e.ensureReferencedToken(sa); err != nil {
|
||||||
|
glog.Errorf("error synchronizing serviceaccount %s/%s: %v", saInfo.namespace, saInfo.name, err)
|
||||||
|
retry = retriable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// secretAdded reacts to a Secret create by ensuring the referenced ServiceAccount exists, and by adding a token to the secret if needed
|
func (e *TokensController) syncSecret() {
|
||||||
func (e *TokensController) secretAdded(obj interface{}) {
|
key, quit := e.syncSecretQueue.Get()
|
||||||
secret := obj.(*api.Secret)
|
if quit {
|
||||||
serviceAccount, err := e.getServiceAccount(secret, true)
|
return
|
||||||
|
}
|
||||||
|
defer e.syncSecretQueue.Done(key)
|
||||||
|
|
||||||
|
// Track whether or not we should retry this sync
|
||||||
|
retry := false
|
||||||
|
defer func() {
|
||||||
|
e.retryOrForget(e.syncSecretQueue, key, retry)
|
||||||
|
}()
|
||||||
|
|
||||||
|
secretInfo, err := parseSecretQueueKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error(err)
|
glog.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if serviceAccount == nil {
|
|
||||||
glog.V(2).Infof(
|
|
||||||
"Deleting new secret %s/%s because service account %s (uid=%s) was not found",
|
|
||||||
secret.Namespace, secret.Name,
|
|
||||||
secret.Annotations[api.ServiceAccountNameKey], secret.Annotations[api.ServiceAccountUIDKey])
|
|
||||||
if err := e.deleteSecret(secret); err != nil {
|
|
||||||
glog.Errorf("Error deleting secret %s/%s: %v", secret.Namespace, secret.Name, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
e.generateTokenIfNeeded(serviceAccount, secret)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// secretUpdated reacts to a Secret update (or re-list) by deleting the secret (if the referenced ServiceAccount does not exist)
|
secret, err := e.getSecret(secretInfo.namespace, secretInfo.name, secretInfo.uid, false)
|
||||||
func (e *TokensController) secretUpdated(oldObj interface{}, newObj interface{}) {
|
switch {
|
||||||
newSecret := newObj.(*api.Secret)
|
case err != nil:
|
||||||
newServiceAccount, err := e.getServiceAccount(newSecret, true)
|
|
||||||
if err != nil {
|
|
||||||
glog.Error(err)
|
glog.Error(err)
|
||||||
return
|
retry = true
|
||||||
}
|
case secret == nil:
|
||||||
if newServiceAccount == nil {
|
// If the service account exists
|
||||||
glog.V(2).Infof(
|
if sa, saErr := e.getServiceAccount(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, false); saErr == nil && sa != nil {
|
||||||
"Deleting updated secret %s/%s because service account %s (uid=%s) was not found",
|
// secret no longer exists, so delete references to this secret from the service account
|
||||||
newSecret.Namespace, newSecret.Name,
|
if err := client.RetryOnConflict(RemoveTokenBackoff, func() error {
|
||||||
newSecret.Annotations[api.ServiceAccountNameKey], newSecret.Annotations[api.ServiceAccountUIDKey])
|
return e.removeSecretReference(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, secretInfo.name)
|
||||||
if err := e.deleteSecret(newSecret); err != nil {
|
}); err != nil {
|
||||||
glog.Errorf("Error deleting secret %s/%s: %v", newSecret.Namespace, newSecret.Name, err)
|
glog.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// Ensure service account exists
|
||||||
|
sa, saErr := e.getServiceAccount(secretInfo.namespace, secretInfo.saName, secretInfo.saUID, true)
|
||||||
|
switch {
|
||||||
|
case saErr != nil:
|
||||||
|
glog.Error(saErr)
|
||||||
|
retry = true
|
||||||
|
case sa == nil:
|
||||||
|
// Delete token
|
||||||
|
glog.V(4).Infof("syncSecret(%s/%s), service account does not exist, deleting token", secretInfo.namespace, secretInfo.name)
|
||||||
|
if retriable, err := e.deleteToken(secretInfo.namespace, secretInfo.name, secretInfo.uid); err != nil {
|
||||||
|
glog.Errorf("error deleting serviceaccount token %s/%s for service account %s: %v", secretInfo.namespace, secretInfo.name, secretInfo.saName, err)
|
||||||
|
retry = retriable
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// Update token if needed
|
||||||
|
if retriable, err := e.generateTokenIfNeeded(sa, secret); err != nil {
|
||||||
|
glog.Errorf("error populating serviceaccount token %s/%s for service account %s: %v", secretInfo.namespace, secretInfo.name, secretInfo.saName, err)
|
||||||
|
retry = retriable
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
e.generateTokenIfNeeded(newServiceAccount, newSecret)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// secretDeleted reacts to a Secret being deleted by removing a reference from the corresponding ServiceAccount if needed
|
func (e *TokensController) deleteTokens(serviceAccount *api.ServiceAccount) ( /*retry*/ bool, error) {
|
||||||
func (e *TokensController) secretDeleted(obj interface{}) {
|
tokens, err := e.listTokenSecrets(serviceAccount)
|
||||||
secret, ok := obj.(*api.Secret)
|
|
||||||
if !ok {
|
|
||||||
// Unknown type. If we missed a Secret deletion, the corresponding ServiceAccount (if it exists)
|
|
||||||
// will get a secret recreated (if needed) during the ServiceAccount re-list
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceAccount, err := e.getServiceAccount(secret, false)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error(err)
|
// don't retry on cache lookup errors
|
||||||
return
|
return false, err
|
||||||
}
|
}
|
||||||
if serviceAccount == nil {
|
retry := false
|
||||||
return
|
errs := []error{}
|
||||||
}
|
for _, token := range tokens {
|
||||||
|
r, err := e.deleteToken(token.Namespace, token.Name, token.UID)
|
||||||
if err := client.RetryOnConflict(RemoveTokenBackoff, func() error {
|
if err != nil {
|
||||||
return e.removeSecretReferenceIfNeeded(serviceAccount, secret.Name)
|
errs = append(errs, err)
|
||||||
}); err != nil {
|
}
|
||||||
utilruntime.HandleError(err)
|
if r {
|
||||||
|
retry = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return retry, utilerrors.NewAggregate(errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// createSecretIfNeeded makes sure at least one ServiceAccountToken secret exists, and is included in the serviceAccount's Secrets list
|
func (e *TokensController) deleteToken(ns, name string, uid types.UID) ( /*retry*/ bool, error) {
|
||||||
func (e *TokensController) createSecretIfNeeded(serviceAccount *api.ServiceAccount) error {
|
var opts *api.DeleteOptions
|
||||||
// If the service account references no secrets, short-circuit and create a new one
|
if len(uid) > 0 {
|
||||||
if len(serviceAccount.Secrets) == 0 {
|
opts = &api.DeleteOptions{Preconditions: &api.Preconditions{UID: &uid}}
|
||||||
return e.createSecret(serviceAccount)
|
|
||||||
}
|
}
|
||||||
|
err := e.client.Core().Secrets(ns).Delete(name, opts)
|
||||||
|
// NotFound doesn't need a retry (it's already been deleted)
|
||||||
|
// Conflict doesn't need a retry (the UID precondition failed)
|
||||||
|
if err == nil || apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// Retry for any other error
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
|
||||||
// We shouldn't try to validate secret references until the secrets store is synced
|
// ensureReferencedToken makes sure at least one ServiceAccountToken secret exists, and is included in the serviceAccount's Secrets list
|
||||||
if !e.secretsSynced() {
|
func (e *TokensController) ensureReferencedToken(serviceAccount *api.ServiceAccount) ( /* retry */ bool, error) {
|
||||||
return nil
|
if len(serviceAccount.Secrets) > 0 {
|
||||||
}
|
allSecrets, err := e.listTokenSecrets(serviceAccount)
|
||||||
|
if err != nil {
|
||||||
// If any existing token secrets are referenced by the service account, return
|
// Don't retry cache lookup errors
|
||||||
allSecrets, err := e.listTokenSecrets(serviceAccount)
|
return false, err
|
||||||
if err != nil {
|
}
|
||||||
return err
|
referencedSecrets := getSecretReferences(serviceAccount)
|
||||||
}
|
for _, secret := range allSecrets {
|
||||||
referencedSecrets := getSecretReferences(serviceAccount)
|
if referencedSecrets.Has(secret.Name) {
|
||||||
for _, secret := range allSecrets {
|
// A service account token already exists, and is referenced, short-circuit
|
||||||
if referencedSecrets.Has(secret.Name) {
|
return false, nil
|
||||||
return nil
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise create a new token secret
|
|
||||||
return e.createSecret(serviceAccount)
|
|
||||||
}
|
|
||||||
|
|
||||||
// createSecret creates a secret of type ServiceAccountToken for the given ServiceAccount
|
|
||||||
func (e *TokensController) createSecret(serviceAccount *api.ServiceAccount) error {
|
|
||||||
// We don't want to update the cache's copy of the service account
|
// We don't want to update the cache's copy of the service account
|
||||||
// so add the secret to a freshly retrieved copy of the service account
|
// so add the secret to a freshly retrieved copy of the service account
|
||||||
serviceAccounts := e.client.Core().ServiceAccounts(serviceAccount.Namespace)
|
serviceAccounts := e.client.Core().ServiceAccounts(serviceAccount.Namespace)
|
||||||
liveServiceAccount, err := serviceAccounts.Get(serviceAccount.Name)
|
liveServiceAccount, err := serviceAccounts.Get(serviceAccount.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
// Retry for any error other than a NotFound
|
||||||
|
return !apierrors.IsNotFound(err), err
|
||||||
}
|
}
|
||||||
if liveServiceAccount.ResourceVersion != serviceAccount.ResourceVersion {
|
if liveServiceAccount.ResourceVersion != serviceAccount.ResourceVersion {
|
||||||
// our view of the service account is not up to date
|
// our view of the service account is not up to date
|
||||||
// we'll get notified of an update event later and get to try again
|
// we'll get notified of an update event later and get to try again
|
||||||
// this only prevent interactions between successive runs of this controller's event handlers, but that is useful
|
glog.V(2).Infof("serviceaccount %s/%s is not up to date, skipping token creation", serviceAccount.Namespace, serviceAccount.Name)
|
||||||
glog.V(2).Infof("View of ServiceAccount %s/%s is not up to date, skipping token creation", serviceAccount.Namespace, serviceAccount.Name)
|
return false, nil
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the secret
|
// Build the secret
|
||||||
@ -325,7 +413,8 @@ func (e *TokensController) createSecret(serviceAccount *api.ServiceAccount) erro
|
|||||||
// Generate the token
|
// Generate the token
|
||||||
token, err := e.token.GenerateToken(*serviceAccount, *secret)
|
token, err := e.token.GenerateToken(*serviceAccount, *secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
// retriable error
|
||||||
|
return true, err
|
||||||
}
|
}
|
||||||
secret.Data[api.ServiceAccountTokenKey] = []byte(token)
|
secret.Data[api.ServiceAccountTokenKey] = []byte(token)
|
||||||
secret.Data[api.ServiceAccountNamespaceKey] = []byte(serviceAccount.Namespace)
|
secret.Data[api.ServiceAccountNamespaceKey] = []byte(serviceAccount.Namespace)
|
||||||
@ -334,41 +423,39 @@ func (e *TokensController) createSecret(serviceAccount *api.ServiceAccount) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Save the secret
|
// Save the secret
|
||||||
if createdToken, err := e.client.Core().Secrets(serviceAccount.Namespace).Create(secret); err != nil {
|
createdToken, err := e.client.Core().Secrets(serviceAccount.Namespace).Create(secret)
|
||||||
return err
|
if err != nil {
|
||||||
} else {
|
// retriable error
|
||||||
// Manually add the new token to the cache store.
|
return true, err
|
||||||
// This prevents the service account update (below) triggering another token creation, if the referenced token couldn't be found in the store
|
|
||||||
e.secrets.Add(createdToken)
|
|
||||||
}
|
}
|
||||||
|
// Manually add the new token to the cache store.
|
||||||
|
// This prevents the service account update (below) triggering another token creation, if the referenced token couldn't be found in the store
|
||||||
|
e.secrets.Add(createdToken)
|
||||||
|
|
||||||
liveServiceAccount.Secrets = append(liveServiceAccount.Secrets, api.ObjectReference{Name: secret.Name})
|
liveServiceAccount.Secrets = append(liveServiceAccount.Secrets, api.ObjectReference{Name: secret.Name})
|
||||||
|
|
||||||
_, err = serviceAccounts.Update(liveServiceAccount)
|
if _, err = serviceAccounts.Update(liveServiceAccount); err != nil {
|
||||||
if err != nil {
|
|
||||||
// we weren't able to use the token, try to clean it up.
|
// we weren't able to use the token, try to clean it up.
|
||||||
glog.V(2).Infof("Deleting secret %s/%s because reference couldn't be added (%v)", secret.Namespace, secret.Name, err)
|
glog.V(2).Infof("deleting secret %s/%s because reference couldn't be added (%v)", secret.Namespace, secret.Name, err)
|
||||||
if err := e.client.Core().Secrets(secret.Namespace).Delete(secret.Name, nil); err != nil {
|
deleteOpts := &api.DeleteOptions{Preconditions: &api.Preconditions{UID: &createdToken.UID}}
|
||||||
glog.Error(err) // if we fail, just log it
|
if deleteErr := e.client.Core().Secrets(createdToken.Namespace).Delete(createdToken.Name, deleteOpts); deleteErr != nil {
|
||||||
|
glog.Error(deleteErr) // if we fail, just log it
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if apierrors.IsConflict(err) {
|
if apierrors.IsConflict(err) || apierrors.IsNotFound(err) {
|
||||||
// nothing to do. We got a conflict, that means that the service account was updated. We simply need to return because we'll get an update notification later
|
// if we got a Conflict error, the service account was updated by someone else, and we'll get an update notification later
|
||||||
return nil
|
// if we got a NotFound error, the service account no longer exists, and we don't need to create a token for it
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// retry in all other cases
|
||||||
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
// success!
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// generateTokenIfNeeded populates the token data for the given Secret if not already set
|
func (e *TokensController) secretUpdateNeeded(secret *api.Secret) (bool, bool, bool) {
|
||||||
func (e *TokensController) generateTokenIfNeeded(serviceAccount *api.ServiceAccount, secret *api.Secret) error {
|
|
||||||
if secret.Annotations == nil {
|
|
||||||
secret.Annotations = map[string]string{}
|
|
||||||
}
|
|
||||||
if secret.Data == nil {
|
|
||||||
secret.Data = map[string][]byte{}
|
|
||||||
}
|
|
||||||
|
|
||||||
caData := secret.Data[api.ServiceAccountRootCAKey]
|
caData := secret.Data[api.ServiceAccountRootCAKey]
|
||||||
needsCA := len(e.rootCA) > 0 && bytes.Compare(caData, e.rootCA) != 0
|
needsCA := len(e.rootCA) > 0 && bytes.Compare(caData, e.rootCA) != 0
|
||||||
|
|
||||||
@ -377,60 +464,103 @@ func (e *TokensController) generateTokenIfNeeded(serviceAccount *api.ServiceAcco
|
|||||||
tokenData := secret.Data[api.ServiceAccountTokenKey]
|
tokenData := secret.Data[api.ServiceAccountTokenKey]
|
||||||
needsToken := len(tokenData) == 0
|
needsToken := len(tokenData) == 0
|
||||||
|
|
||||||
|
return needsCA, needsNamespace, needsToken
|
||||||
|
}
|
||||||
|
|
||||||
|
// generateTokenIfNeeded populates the token data for the given Secret if not already set
|
||||||
|
func (e *TokensController) generateTokenIfNeeded(serviceAccount *api.ServiceAccount, cachedSecret *api.Secret) ( /* retry */ bool, error) {
|
||||||
|
// Check the cached secret to see if changes are needed
|
||||||
|
if needsCA, needsNamespace, needsToken := e.secretUpdateNeeded(cachedSecret); !needsCA && !needsToken && !needsNamespace {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't want to update the cache's copy of the secret
|
||||||
|
// so add the token to a freshly retrieved copy of the secret
|
||||||
|
secrets := e.client.Core().Secrets(cachedSecret.Namespace)
|
||||||
|
liveSecret, err := secrets.Get(cachedSecret.Name)
|
||||||
|
if err != nil {
|
||||||
|
// Retry for any error other than a NotFound
|
||||||
|
return !apierrors.IsNotFound(err), err
|
||||||
|
}
|
||||||
|
if liveSecret.ResourceVersion != cachedSecret.ResourceVersion {
|
||||||
|
// our view of the secret is not up to date
|
||||||
|
// we'll get notified of an update event later and get to try again
|
||||||
|
glog.V(2).Infof("secret %s/%s is not up to date, skipping token population", liveSecret.Namespace, liveSecret.Name)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
needsCA, needsNamespace, needsToken := e.secretUpdateNeeded(liveSecret)
|
||||||
if !needsCA && !needsToken && !needsNamespace {
|
if !needsCA && !needsToken && !needsNamespace {
|
||||||
return nil
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if liveSecret.Annotations == nil {
|
||||||
|
liveSecret.Annotations = map[string]string{}
|
||||||
|
}
|
||||||
|
if liveSecret.Data == nil {
|
||||||
|
liveSecret.Data = map[string][]byte{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the CA
|
// Set the CA
|
||||||
if needsCA {
|
if needsCA {
|
||||||
secret.Data[api.ServiceAccountRootCAKey] = e.rootCA
|
liveSecret.Data[api.ServiceAccountRootCAKey] = e.rootCA
|
||||||
}
|
}
|
||||||
// Set the namespace
|
// Set the namespace
|
||||||
if needsNamespace {
|
if needsNamespace {
|
||||||
secret.Data[api.ServiceAccountNamespaceKey] = []byte(secret.Namespace)
|
liveSecret.Data[api.ServiceAccountNamespaceKey] = []byte(liveSecret.Namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate the token
|
// Generate the token
|
||||||
if needsToken {
|
if needsToken {
|
||||||
token, err := e.token.GenerateToken(*serviceAccount, *secret)
|
token, err := e.token.GenerateToken(*serviceAccount, *liveSecret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
secret.Data[api.ServiceAccountTokenKey] = []byte(token)
|
liveSecret.Data[api.ServiceAccountTokenKey] = []byte(token)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set annotations
|
// Set annotations
|
||||||
secret.Annotations[api.ServiceAccountNameKey] = serviceAccount.Name
|
liveSecret.Annotations[api.ServiceAccountNameKey] = serviceAccount.Name
|
||||||
secret.Annotations[api.ServiceAccountUIDKey] = string(serviceAccount.UID)
|
liveSecret.Annotations[api.ServiceAccountUIDKey] = string(serviceAccount.UID)
|
||||||
|
|
||||||
// Save the secret
|
// Save the secret
|
||||||
if _, err := e.client.Core().Secrets(secret.Namespace).Update(secret); err != nil {
|
_, err = secrets.Update(liveSecret)
|
||||||
return err
|
if apierrors.IsConflict(err) || apierrors.IsNotFound(err) {
|
||||||
|
// if we got a Conflict error, the secret was updated by someone else, and we'll get an update notification later
|
||||||
|
// if we got a NotFound error, the secret no longer exists, and we don't need to populate a token
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
return nil
|
if err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteSecret deletes the given secret
|
// removeSecretReference updates the given ServiceAccount to remove a reference to the given secretName if needed.
|
||||||
func (e *TokensController) deleteSecret(secret *api.Secret) error {
|
func (e *TokensController) removeSecretReference(saNamespace string, saName string, saUID types.UID, secretName string) error {
|
||||||
return e.client.Core().Secrets(secret.Namespace).Delete(secret.Name, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// removeSecretReferenceIfNeeded updates the given ServiceAccount to remove a reference to the given secretName if needed.
|
|
||||||
// Returns whether an update was performed, and any error that occurred
|
|
||||||
func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.ServiceAccount, secretName string) error {
|
|
||||||
// We don't want to update the cache's copy of the service account
|
// We don't want to update the cache's copy of the service account
|
||||||
// so remove the secret from a freshly retrieved copy of the service account
|
// so remove the secret from a freshly retrieved copy of the service account
|
||||||
serviceAccounts := e.client.Core().ServiceAccounts(serviceAccount.Namespace)
|
serviceAccounts := e.client.Core().ServiceAccounts(saNamespace)
|
||||||
serviceAccount, err := serviceAccounts.Get(serviceAccount.Name)
|
serviceAccount, err := serviceAccounts.Get(saName)
|
||||||
|
// Ignore NotFound errors when attempting to remove a reference
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Double-check to see if the account still references the secret
|
// Short-circuit if the UID doesn't match
|
||||||
|
if len(saUID) > 0 && saUID != serviceAccount.UID {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Short-circuit if the secret is no longer referenced
|
||||||
if !getSecretReferences(serviceAccount).Has(secretName) {
|
if !getSecretReferences(serviceAccount).Has(secretName) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove the secret
|
||||||
secrets := []api.ObjectReference{}
|
secrets := []api.ObjectReference{}
|
||||||
for _, s := range serviceAccount.Secrets {
|
for _, s := range serviceAccount.Secrets {
|
||||||
if s.Name != secretName {
|
if s.Name != secretName {
|
||||||
@ -438,59 +568,90 @@ func (e *TokensController) removeSecretReferenceIfNeeded(serviceAccount *api.Ser
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
serviceAccount.Secrets = secrets
|
serviceAccount.Secrets = secrets
|
||||||
|
|
||||||
_, err = serviceAccounts.Update(serviceAccount)
|
_, err = serviceAccounts.Update(serviceAccount)
|
||||||
if err != nil {
|
// Ignore NotFound errors when attempting to remove a reference
|
||||||
return err
|
if apierrors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getServiceAccount returns the ServiceAccount referenced by the given secret. If the secret is not
|
func (e *TokensController) getServiceAccount(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*api.ServiceAccount, error) {
|
||||||
// of type ServiceAccountToken, or if the referenced ServiceAccount does not exist, nil is returned
|
// Look up in cache
|
||||||
func (e *TokensController) getServiceAccount(secret *api.Secret, fetchOnCacheMiss bool) (*api.ServiceAccount, error) {
|
obj, exists, err := e.serviceAccounts.GetByKey(makeCacheKey(ns, name))
|
||||||
name, _ := serviceAccountNameAndUID(secret)
|
|
||||||
if len(name) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
key := &api.ServiceAccount{ObjectMeta: api.ObjectMeta{Namespace: secret.Namespace}}
|
|
||||||
namespaceAccounts, err := e.serviceAccounts.Index("namespace", key)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if exists {
|
||||||
for _, obj := range namespaceAccounts {
|
sa, ok := obj.(*api.ServiceAccount)
|
||||||
serviceAccount := obj.(*api.ServiceAccount)
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("expected *api.ServiceAccount, got %#v", sa)
|
||||||
if serviceaccount.IsServiceAccountToken(secret, serviceAccount) {
|
}
|
||||||
return serviceAccount, nil
|
// Ensure UID matches if given
|
||||||
|
if len(uid) == 0 || uid == sa.UID {
|
||||||
|
return sa, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if fetchOnCacheMiss {
|
if !fetchOnCacheMiss {
|
||||||
serviceAccount, err := e.client.Core().ServiceAccounts(secret.Namespace).Get(name)
|
return nil, nil
|
||||||
if apierrors.IsNotFound(err) {
|
}
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if serviceaccount.IsServiceAccountToken(secret, serviceAccount) {
|
// Live lookup
|
||||||
return serviceAccount, nil
|
sa, err := e.client.Core().ServiceAccounts(ns).Get(name)
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Ensure UID matches if given
|
||||||
|
if len(uid) == 0 || uid == sa.UID {
|
||||||
|
return sa, nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*api.Secret, error) {
|
||||||
|
// Look up in cache
|
||||||
|
obj, exists, err := e.secrets.GetByKey(makeCacheKey(ns, name))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if exists {
|
||||||
|
secret, ok := obj.(*api.Secret)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("expected *api.Secret, got %#v", secret)
|
||||||
|
}
|
||||||
|
// Ensure UID matches if given
|
||||||
|
if len(uid) == 0 || uid == secret.UID {
|
||||||
|
return secret, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !fetchOnCacheMiss {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Live lookup
|
||||||
|
secret, err := e.client.Core().Secrets(ns).Get(name)
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Ensure UID matches if given
|
||||||
|
if len(uid) == 0 || uid == secret.UID {
|
||||||
|
return secret, nil
|
||||||
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// listTokenSecrets returns a list of all of the ServiceAccountToken secrets that
|
// listTokenSecrets returns a list of all of the ServiceAccountToken secrets that
|
||||||
// reference the given service account's name and uid
|
// reference the given service account's name and uid
|
||||||
func (e *TokensController) listTokenSecrets(serviceAccount *api.ServiceAccount) ([]*api.Secret, error) {
|
func (e *TokensController) listTokenSecrets(serviceAccount *api.ServiceAccount) ([]*api.Secret, error) {
|
||||||
key := &api.Secret{ObjectMeta: api.ObjectMeta{Namespace: serviceAccount.Namespace}}
|
namespaceSecrets, err := e.secrets.ByIndex("namespace", serviceAccount.Namespace)
|
||||||
namespaceSecrets, err := e.secrets.Index("namespace", key)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -523,3 +684,63 @@ func getSecretReferences(serviceAccount *api.ServiceAccount) sets.String {
|
|||||||
}
|
}
|
||||||
return references
|
return references
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serviceAccountQueueKey holds information we need to sync a service account.
|
||||||
|
// It contains enough information to look up the cached service account,
|
||||||
|
// or delete owned tokens if the service account no longer exists.
|
||||||
|
type serviceAccountQueueKey struct {
|
||||||
|
namespace string
|
||||||
|
name string
|
||||||
|
uid types.UID
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeServiceAccountKey(sa *api.ServiceAccount) interface{} {
|
||||||
|
return serviceAccountQueueKey{
|
||||||
|
namespace: sa.Namespace,
|
||||||
|
name: sa.Name,
|
||||||
|
uid: sa.UID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseServiceAccountKey(key interface{}) (serviceAccountQueueKey, error) {
|
||||||
|
queueKey, ok := key.(serviceAccountQueueKey)
|
||||||
|
if !ok || len(queueKey.namespace) == 0 || len(queueKey.name) == 0 || len(queueKey.uid) == 0 {
|
||||||
|
return serviceAccountQueueKey{}, fmt.Errorf("invalid serviceaccount key: %#v", key)
|
||||||
|
}
|
||||||
|
return queueKey, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// secretQueueKey holds information we need to sync a service account token secret.
|
||||||
|
// It contains enough information to look up the cached service account,
|
||||||
|
// or delete the secret reference if the secret no longer exists.
|
||||||
|
type secretQueueKey struct {
|
||||||
|
namespace string
|
||||||
|
name string
|
||||||
|
uid types.UID
|
||||||
|
saName string
|
||||||
|
// optional, will be blank when syncing tokens missing the service account uid annotation
|
||||||
|
saUID types.UID
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeSecretQueueKey(secret *api.Secret) interface{} {
|
||||||
|
return secretQueueKey{
|
||||||
|
namespace: secret.Namespace,
|
||||||
|
name: secret.Name,
|
||||||
|
uid: secret.UID,
|
||||||
|
saName: secret.Annotations[api.ServiceAccountNameKey],
|
||||||
|
saUID: types.UID(secret.Annotations[api.ServiceAccountUIDKey]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseSecretQueueKey(key interface{}) (secretQueueKey, error) {
|
||||||
|
queueKey, ok := key.(secretQueueKey)
|
||||||
|
if !ok || len(queueKey.namespace) == 0 || len(queueKey.name) == 0 || len(queueKey.uid) == 0 || len(queueKey.saName) == 0 {
|
||||||
|
return secretQueueKey{}, fmt.Errorf("invalid secret key: %#v", key)
|
||||||
|
}
|
||||||
|
return queueKey, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// produce the same key format as cache.MetaNamespaceKeyFunc
|
||||||
|
func makeCacheKey(namespace, name string) string {
|
||||||
|
return namespace + "/" + name
|
||||||
|
}
|
||||||
|
@ -17,10 +17,15 @@ limitations under the License.
|
|||||||
package serviceaccount
|
package serviceaccount
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
||||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||||
@ -63,7 +68,12 @@ func tokenSecretReferences() []api.ObjectReference {
|
|||||||
|
|
||||||
// addTokenSecretReference adds a reference to the ServiceAccountToken that will be created
|
// addTokenSecretReference adds a reference to the ServiceAccountToken that will be created
|
||||||
func addTokenSecretReference(refs []api.ObjectReference) []api.ObjectReference {
|
func addTokenSecretReference(refs []api.ObjectReference) []api.ObjectReference {
|
||||||
return append(refs, api.ObjectReference{Name: "default-token-fplln"})
|
return addNamedTokenSecretReference(refs, "default-token-fplln")
|
||||||
|
}
|
||||||
|
|
||||||
|
// addNamedTokenSecretReference adds a reference to the named ServiceAccountToken
|
||||||
|
func addNamedTokenSecretReference(refs []api.ObjectReference, name string) []api.ObjectReference {
|
||||||
|
return append(refs, api.ObjectReference{Name: name})
|
||||||
}
|
}
|
||||||
|
|
||||||
// serviceAccount returns a service account with the given secret refs
|
// serviceAccount returns a service account with the given secret refs
|
||||||
@ -104,10 +114,15 @@ func opaqueSecret() *api.Secret {
|
|||||||
|
|
||||||
// createdTokenSecret returns the ServiceAccountToken secret posted when creating a new token secret.
|
// createdTokenSecret returns the ServiceAccountToken secret posted when creating a new token secret.
|
||||||
// Named "default-token-fplln", since that is the first generated name after rand.Seed(1)
|
// Named "default-token-fplln", since that is the first generated name after rand.Seed(1)
|
||||||
func createdTokenSecret() *api.Secret {
|
func createdTokenSecret(overrideName ...string) *api.Secret {
|
||||||
|
return namedCreatedTokenSecret("default-token-fplln")
|
||||||
|
}
|
||||||
|
|
||||||
|
// namedTokenSecret returns the ServiceAccountToken secret posted when creating a new token secret with the given name.
|
||||||
|
func namedCreatedTokenSecret(name string) *api.Secret {
|
||||||
return &api.Secret{
|
return &api.Secret{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: "default-token-fplln",
|
Name: name,
|
||||||
Namespace: "default",
|
Namespace: "default",
|
||||||
Annotations: map[string]string{
|
Annotations: map[string]string{
|
||||||
api.ServiceAccountNameKey: "default",
|
api.ServiceAccountNameKey: "default",
|
||||||
@ -180,12 +195,20 @@ func serviceAccountTokenSecretWithNamespaceData(data []byte) *api.Secret {
|
|||||||
return secret
|
return secret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type reaction struct {
|
||||||
|
verb string
|
||||||
|
resource string
|
||||||
|
reactor func(t *testing.T) core.ReactionFunc
|
||||||
|
}
|
||||||
|
|
||||||
func TestTokenCreation(t *testing.T) {
|
func TestTokenCreation(t *testing.T) {
|
||||||
testcases := map[string]struct {
|
testcases := map[string]struct {
|
||||||
ClientObjects []runtime.Object
|
ClientObjects []runtime.Object
|
||||||
|
|
||||||
SecretsSyncPending bool
|
IsAsync bool
|
||||||
ServiceAccountsSyncPending bool
|
MaxRetries int
|
||||||
|
|
||||||
|
Reactors []reaction
|
||||||
|
|
||||||
ExistingServiceAccount *api.ServiceAccount
|
ExistingServiceAccount *api.ServiceAccount
|
||||||
ExistingSecrets []*api.Secret
|
ExistingSecrets []*api.Secret
|
||||||
@ -209,16 +232,66 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"new serviceaccount with no secrets with unsynced secret store": {
|
"new serviceaccount with no secrets encountering create error": {
|
||||||
ClientObjects: []runtime.Object{serviceAccount(emptySecretReferences()), createdTokenSecret()},
|
ClientObjects: []runtime.Object{serviceAccount(emptySecretReferences()), createdTokenSecret()},
|
||||||
|
MaxRetries: 10,
|
||||||
SecretsSyncPending: true,
|
IsAsync: true,
|
||||||
|
Reactors: []reaction{{
|
||||||
|
verb: "create",
|
||||||
|
resource: "secrets",
|
||||||
|
reactor: func(t *testing.T) core.ReactionFunc {
|
||||||
|
i := 0
|
||||||
|
return func(core.Action) (bool, runtime.Object, error) {
|
||||||
|
i++
|
||||||
|
if i < 3 {
|
||||||
|
return true, nil, apierrors.NewForbidden(api.Resource("secrets"), "foo", errors.New("No can do"))
|
||||||
|
}
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
|
||||||
AddedServiceAccount: serviceAccount(emptySecretReferences()),
|
AddedServiceAccount: serviceAccount(emptySecretReferences()),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
// Attempt 1
|
||||||
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
|
||||||
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, createdTokenSecret()),
|
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, createdTokenSecret()),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
|
|
||||||
|
// Attempt 2
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
|
||||||
|
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, namedCreatedTokenSecret("default-token-gziey")),
|
||||||
|
|
||||||
|
// Attempt 3
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
|
||||||
|
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, namedCreatedTokenSecret("default-token-oh43e")),
|
||||||
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addNamedTokenSecretReference(emptySecretReferences(), "default-token-oh43e"))),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"new serviceaccount with no secrets encountering unending create error": {
|
||||||
|
ClientObjects: []runtime.Object{serviceAccount(emptySecretReferences()), createdTokenSecret()},
|
||||||
|
MaxRetries: 2,
|
||||||
|
IsAsync: true,
|
||||||
|
Reactors: []reaction{{
|
||||||
|
verb: "create",
|
||||||
|
resource: "secrets",
|
||||||
|
reactor: func(t *testing.T) core.ReactionFunc {
|
||||||
|
return func(core.Action) (bool, runtime.Object, error) {
|
||||||
|
return true, nil, apierrors.NewForbidden(api.Resource("secrets"), "foo", errors.New("No can do"))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
|
||||||
|
AddedServiceAccount: serviceAccount(emptySecretReferences()),
|
||||||
|
ExpectedActions: []core.Action{
|
||||||
|
// Attempt
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
|
||||||
|
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, createdTokenSecret()),
|
||||||
|
// Retry 1
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
|
||||||
|
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, namedCreatedTokenSecret("default-token-gziey")),
|
||||||
|
// Retry 2
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
|
||||||
|
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, namedCreatedTokenSecret("default-token-oh43e")),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"new serviceaccount with missing secrets": {
|
"new serviceaccount with missing secrets": {
|
||||||
@ -231,14 +304,6 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"new serviceaccount with missing secrets with unsynced secret store": {
|
|
||||||
ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences()), createdTokenSecret()},
|
|
||||||
|
|
||||||
SecretsSyncPending: true,
|
|
||||||
|
|
||||||
AddedServiceAccount: serviceAccount(missingSecretReferences()),
|
|
||||||
ExpectedActions: []core.Action{},
|
|
||||||
},
|
|
||||||
"new serviceaccount with non-token secrets": {
|
"new serviceaccount with non-token secrets": {
|
||||||
ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), createdTokenSecret(), opaqueSecret()},
|
ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), createdTokenSecret(), opaqueSecret()},
|
||||||
|
|
||||||
@ -275,18 +340,6 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"updated serviceaccount with no secrets with unsynced secret store": {
|
|
||||||
ClientObjects: []runtime.Object{serviceAccount(emptySecretReferences()), createdTokenSecret()},
|
|
||||||
|
|
||||||
SecretsSyncPending: true,
|
|
||||||
|
|
||||||
UpdatedServiceAccount: serviceAccount(emptySecretReferences()),
|
|
||||||
ExpectedActions: []core.Action{
|
|
||||||
core.NewGetAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, "default"),
|
|
||||||
core.NewCreateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, createdTokenSecret()),
|
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(emptySecretReferences()))),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"updated serviceaccount with missing secrets": {
|
"updated serviceaccount with missing secrets": {
|
||||||
ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences()), createdTokenSecret()},
|
ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences()), createdTokenSecret()},
|
||||||
|
|
||||||
@ -297,14 +350,6 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "serviceaccounts"}, api.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"updated serviceaccount with missing secrets with unsynced secret store": {
|
|
||||||
ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences()), createdTokenSecret()},
|
|
||||||
|
|
||||||
SecretsSyncPending: true,
|
|
||||||
|
|
||||||
UpdatedServiceAccount: serviceAccount(missingSecretReferences()),
|
|
||||||
ExpectedActions: []core.Action{},
|
|
||||||
},
|
|
||||||
"updated serviceaccount with non-token secrets": {
|
"updated serviceaccount with non-token secrets": {
|
||||||
ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), createdTokenSecret(), opaqueSecret()},
|
ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), createdTokenSecret(), opaqueSecret()},
|
||||||
|
|
||||||
@ -375,6 +420,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
AddedSecret: serviceAccountTokenSecretWithoutTokenData(),
|
AddedSecret: serviceAccountTokenSecretWithoutTokenData(),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -384,6 +430,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
AddedSecret: serviceAccountTokenSecretWithoutCAData(),
|
AddedSecret: serviceAccountTokenSecretWithoutCAData(),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -393,6 +440,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
AddedSecret: serviceAccountTokenSecretWithCAData([]byte("mismatched")),
|
AddedSecret: serviceAccountTokenSecretWithCAData([]byte("mismatched")),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -402,6 +450,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
AddedSecret: serviceAccountTokenSecretWithoutNamespaceData(),
|
AddedSecret: serviceAccountTokenSecretWithoutNamespaceData(),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -436,6 +485,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
UpdatedSecret: serviceAccountTokenSecretWithoutTokenData(),
|
UpdatedSecret: serviceAccountTokenSecretWithoutTokenData(),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -445,6 +495,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
UpdatedSecret: serviceAccountTokenSecretWithoutCAData(),
|
UpdatedSecret: serviceAccountTokenSecretWithoutCAData(),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -454,6 +505,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
UpdatedSecret: serviceAccountTokenSecretWithCAData([]byte("mismatched")),
|
UpdatedSecret: serviceAccountTokenSecretWithCAData([]byte("mismatched")),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -463,6 +515,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
|
|
||||||
UpdatedSecret: serviceAccountTokenSecretWithoutNamespaceData(),
|
UpdatedSecret: serviceAccountTokenSecretWithoutNamespaceData(),
|
||||||
ExpectedActions: []core.Action{
|
ExpectedActions: []core.Action{
|
||||||
|
core.NewGetAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, "token-secret-1"),
|
||||||
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "secrets"}, api.NamespaceDefault, serviceAccountTokenSecret()),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -501,6 +554,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for k, tc := range testcases {
|
for k, tc := range testcases {
|
||||||
|
glog.Infof(k)
|
||||||
|
|
||||||
// Re-seed to reset name generation
|
// Re-seed to reset name generation
|
||||||
utilrand.Seed(1)
|
utilrand.Seed(1)
|
||||||
@ -508,12 +562,11 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
generator := &testGenerator{Token: "ABC"}
|
generator := &testGenerator{Token: "ABC"}
|
||||||
|
|
||||||
client := fake.NewSimpleClientset(tc.ClientObjects...)
|
client := fake.NewSimpleClientset(tc.ClientObjects...)
|
||||||
|
for _, reactor := range tc.Reactors {
|
||||||
|
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactor(t))
|
||||||
|
}
|
||||||
|
|
||||||
controller := NewTokensController(client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data")})
|
controller := NewTokensController(client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data"), MaxRetries: tc.MaxRetries})
|
||||||
|
|
||||||
// Tell the token controller whether its stores have been synced
|
|
||||||
controller.serviceAccountsSynced = func() bool { return !tc.ServiceAccountsSyncPending }
|
|
||||||
controller.secretsSynced = func() bool { return !tc.SecretsSyncPending }
|
|
||||||
|
|
||||||
if tc.ExistingServiceAccount != nil {
|
if tc.ExistingServiceAccount != nil {
|
||||||
controller.serviceAccounts.Add(tc.ExistingServiceAccount)
|
controller.serviceAccounts.Add(tc.ExistingServiceAccount)
|
||||||
@ -523,22 +576,72 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if tc.AddedServiceAccount != nil {
|
if tc.AddedServiceAccount != nil {
|
||||||
controller.serviceAccountAdded(tc.AddedServiceAccount)
|
controller.serviceAccounts.Add(tc.AddedServiceAccount)
|
||||||
|
controller.queueServiceAccountSync(tc.AddedServiceAccount)
|
||||||
}
|
}
|
||||||
if tc.UpdatedServiceAccount != nil {
|
if tc.UpdatedServiceAccount != nil {
|
||||||
controller.serviceAccountUpdated(nil, tc.UpdatedServiceAccount)
|
controller.serviceAccounts.Add(tc.UpdatedServiceAccount)
|
||||||
|
controller.queueServiceAccountUpdateSync(nil, tc.UpdatedServiceAccount)
|
||||||
}
|
}
|
||||||
if tc.DeletedServiceAccount != nil {
|
if tc.DeletedServiceAccount != nil {
|
||||||
controller.serviceAccountDeleted(tc.DeletedServiceAccount)
|
controller.serviceAccounts.Delete(tc.DeletedServiceAccount)
|
||||||
|
controller.queueServiceAccountSync(tc.DeletedServiceAccount)
|
||||||
}
|
}
|
||||||
if tc.AddedSecret != nil {
|
if tc.AddedSecret != nil {
|
||||||
controller.secretAdded(tc.AddedSecret)
|
controller.secrets.Add(tc.AddedSecret)
|
||||||
|
controller.queueSecretSync(tc.AddedSecret)
|
||||||
}
|
}
|
||||||
if tc.UpdatedSecret != nil {
|
if tc.UpdatedSecret != nil {
|
||||||
controller.secretUpdated(nil, tc.UpdatedSecret)
|
controller.secrets.Add(tc.UpdatedSecret)
|
||||||
|
controller.queueSecretUpdateSync(nil, tc.UpdatedSecret)
|
||||||
}
|
}
|
||||||
if tc.DeletedSecret != nil {
|
if tc.DeletedSecret != nil {
|
||||||
controller.secretDeleted(tc.DeletedSecret)
|
controller.secrets.Delete(tc.DeletedSecret)
|
||||||
|
controller.queueSecretSync(tc.DeletedSecret)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is the longest we'll wait for async tests
|
||||||
|
timeout := time.Now().Add(30 * time.Second)
|
||||||
|
waitedForAdditionalActions := false
|
||||||
|
|
||||||
|
for {
|
||||||
|
if controller.syncServiceAccountQueue.Len() > 0 {
|
||||||
|
controller.syncServiceAccount()
|
||||||
|
}
|
||||||
|
if controller.syncSecretQueue.Len() > 0 {
|
||||||
|
controller.syncSecret()
|
||||||
|
}
|
||||||
|
|
||||||
|
// The queues still have things to work on
|
||||||
|
if controller.syncServiceAccountQueue.Len() > 0 || controller.syncSecretQueue.Len() > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we expect this test to work asynchronously...
|
||||||
|
if tc.IsAsync {
|
||||||
|
// if we're still missing expected actions within our test timeout
|
||||||
|
if len(client.Actions()) < len(tc.ExpectedActions) && time.Now().Before(timeout) {
|
||||||
|
// wait for the expected actions (without hotlooping)
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we exactly match our expected actions, wait a bit to make sure no other additional actions show up
|
||||||
|
if len(client.Actions()) == len(tc.ExpectedActions) && !waitedForAdditionalActions {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
waitedForAdditionalActions = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if controller.syncServiceAccountQueue.Len() > 0 {
|
||||||
|
t.Errorf("%s: unexpected items in service account queue: %d", k, controller.syncServiceAccountQueue.Len())
|
||||||
|
}
|
||||||
|
if controller.syncSecretQueue.Len() > 0 {
|
||||||
|
t.Errorf("%s: unexpected items in secret queue: %d", k, controller.syncSecretQueue.Len())
|
||||||
}
|
}
|
||||||
|
|
||||||
actions := client.Actions()
|
actions := client.Actions()
|
||||||
@ -556,7 +659,10 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(tc.ExpectedActions) > len(actions) {
|
if len(tc.ExpectedActions) > len(actions) {
|
||||||
t.Errorf("%s: %d additional expected actions:%+v", k, len(tc.ExpectedActions)-len(actions), tc.ExpectedActions[len(actions):])
|
t.Errorf("%s: %d additional expected actions", k, len(tc.ExpectedActions)-len(actions))
|
||||||
|
for _, a := range tc.ExpectedActions[len(actions):] {
|
||||||
|
t.Logf(" %+v", a)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -415,15 +415,16 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start the service account and service account token controllers
|
// Start the service account and service account token controllers
|
||||||
|
stopCh := make(chan struct{})
|
||||||
tokenController := serviceaccountcontroller.NewTokensController(rootClientset, serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)})
|
tokenController := serviceaccountcontroller.NewTokensController(rootClientset, serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)})
|
||||||
tokenController.Run()
|
go tokenController.Run(1, stopCh)
|
||||||
serviceAccountController := serviceaccountcontroller.NewServiceAccountsController(rootClientset, serviceaccountcontroller.DefaultServiceAccountsControllerOptions())
|
serviceAccountController := serviceaccountcontroller.NewServiceAccountsController(rootClientset, serviceaccountcontroller.DefaultServiceAccountsControllerOptions())
|
||||||
serviceAccountController.Run()
|
serviceAccountController.Run()
|
||||||
// Start the admission plugin reflectors
|
// Start the admission plugin reflectors
|
||||||
serviceAccountAdmission.Run()
|
serviceAccountAdmission.Run()
|
||||||
|
|
||||||
stop := func() {
|
stop := func() {
|
||||||
tokenController.Stop()
|
close(stopCh)
|
||||||
serviceAccountController.Stop()
|
serviceAccountController.Stop()
|
||||||
serviceAccountAdmission.Stop()
|
serviceAccountAdmission.Stop()
|
||||||
apiServer.Close()
|
apiServer.Close()
|
||||||
|
Loading…
Reference in New Issue
Block a user