mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Switch the tokens controller to use shared informers
Tokens controller previously needed a bit of extra help in order to be safe for concurrent use. The new MutationCache allows it to keep a local cache and still use a shared informer. The filtering event handler lets it only see changes to secrets it cares about.
This commit is contained in:
parent
5ac3214c42
commit
784e3ae5fa
@ -397,14 +397,20 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
|
|||||||
rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
|
rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
|
||||||
}
|
}
|
||||||
|
|
||||||
go serviceaccountcontroller.NewTokensController(
|
controller := serviceaccountcontroller.NewTokensController(
|
||||||
|
sharedInformers.Core().V1().ServiceAccounts(),
|
||||||
|
sharedInformers.Core().V1().Secrets(),
|
||||||
rootClientBuilder.ClientOrDie("tokens-controller"),
|
rootClientBuilder.ClientOrDie("tokens-controller"),
|
||||||
serviceaccountcontroller.TokensControllerOptions{
|
serviceaccountcontroller.TokensControllerOptions{
|
||||||
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
||||||
RootCA: rootCA,
|
RootCA: rootCA,
|
||||||
},
|
},
|
||||||
).Run(int(s.ConcurrentSATokenSyncs), stop)
|
)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
go controller.Run(int(s.ConcurrentSATokenSyncs), stop)
|
||||||
|
|
||||||
|
// start the first set of informers now so that other controllers can start
|
||||||
|
sharedInformers.Start(stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -24,20 +24,19 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
|
||||||
|
listersv1 "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||||
clientretry "k8s.io/kubernetes/pkg/client/retry"
|
clientretry "k8s.io/kubernetes/pkg/client/retry"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/registry/core/secret"
|
"k8s.io/kubernetes/pkg/registry/core/secret"
|
||||||
"k8s.io/kubernetes/pkg/serviceaccount"
|
"k8s.io/kubernetes/pkg/serviceaccount"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
@ -71,7 +70,7 @@ type TokensControllerOptions struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewTokensController returns a new *TokensController.
|
// NewTokensController returns a new *TokensController.
|
||||||
func NewTokensController(cl clientset.Interface, options TokensControllerOptions) *TokensController {
|
func NewTokensController(serviceAccounts informers.ServiceAccountInformer, secrets informers.SecretInformer, cl clientset.Interface, options TokensControllerOptions) *TokensController {
|
||||||
maxRetries := options.MaxRetries
|
maxRetries := options.MaxRetries
|
||||||
if maxRetries == 0 {
|
if maxRetries == 0 {
|
||||||
maxRetries = 10
|
maxRetries = 10
|
||||||
@ -91,44 +90,38 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions
|
|||||||
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().RESTClient().GetRateLimiter())
|
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().RESTClient().GetRateLimiter())
|
||||||
}
|
}
|
||||||
|
|
||||||
e.serviceAccounts, e.serviceAccountController = cache.NewInformer(
|
e.serviceAccounts = serviceAccounts.Lister()
|
||||||
&cache.ListWatch{
|
e.serviceAccountSynced = serviceAccounts.Informer().HasSynced
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
serviceAccounts.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
return e.client.Core().ServiceAccounts(metav1.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
return e.client.Core().ServiceAccounts(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.ServiceAccount{},
|
|
||||||
options.ServiceAccountResync,
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: e.queueServiceAccountSync,
|
AddFunc: e.queueServiceAccountSync,
|
||||||
UpdateFunc: e.queueServiceAccountUpdateSync,
|
UpdateFunc: e.queueServiceAccountUpdateSync,
|
||||||
DeleteFunc: e.queueServiceAccountSync,
|
DeleteFunc: e.queueServiceAccountSync,
|
||||||
},
|
},
|
||||||
|
options.ServiceAccountResync,
|
||||||
)
|
)
|
||||||
|
|
||||||
tokenSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(v1.SecretTypeServiceAccountToken)})
|
secretCache := secrets.Informer().GetIndexer()
|
||||||
e.secrets, e.secretController = cache.NewIndexerInformer(
|
e.updatedSecrets = cache.NewIntegerResourceVersionMutationCache(secretCache, secretCache, 60*time.Second, true)
|
||||||
&cache.ListWatch{
|
e.secretSynced = secrets.Informer().HasSynced
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
secrets.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
options.FieldSelector = tokenSelector.String()
|
cache.FilteringResourceEventHandler{
|
||||||
return e.client.Core().Secrets(metav1.NamespaceAll).List(options)
|
FilterFunc: func(obj interface{}) bool {
|
||||||
|
switch t := obj.(type) {
|
||||||
|
case *v1.Secret:
|
||||||
|
return t.Type == v1.SecretTypeServiceAccountToken
|
||||||
|
default:
|
||||||
|
utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj))
|
||||||
|
return false
|
||||||
|
}
|
||||||
},
|
},
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
Handler: cache.ResourceEventHandlerFuncs{
|
||||||
options.FieldSelector = tokenSelector.String()
|
|
||||||
return e.client.Core().Secrets(metav1.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
&v1.Secret{},
|
|
||||||
options.SecretResync,
|
|
||||||
cache.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: e.queueSecretSync,
|
AddFunc: e.queueSecretSync,
|
||||||
UpdateFunc: e.queueSecretUpdateSync,
|
UpdateFunc: e.queueSecretUpdateSync,
|
||||||
DeleteFunc: e.queueSecretSync,
|
DeleteFunc: e.queueSecretSync,
|
||||||
},
|
},
|
||||||
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
|
},
|
||||||
|
options.SecretResync,
|
||||||
)
|
)
|
||||||
|
|
||||||
return e
|
return e
|
||||||
@ -141,12 +134,15 @@ type TokensController struct {
|
|||||||
|
|
||||||
rootCA []byte
|
rootCA []byte
|
||||||
|
|
||||||
serviceAccounts cache.Store
|
serviceAccounts listersv1.ServiceAccountLister
|
||||||
secrets cache.Indexer
|
// updatedSecrets is a wrapper around the shared cache which allows us to record
|
||||||
|
// and return our local mutations (since we're very likely to act on an updated
|
||||||
|
// secret before the watch reports it).
|
||||||
|
updatedSecrets cache.MutationCache
|
||||||
|
|
||||||
// Since we join two objects, we'll watch both of them with controllers.
|
// Since we join two objects, we'll watch both of them with controllers.
|
||||||
serviceAccountController cache.Controller
|
serviceAccountSynced cache.InformerSynced
|
||||||
secretController cache.Controller
|
secretSynced cache.InformerSynced
|
||||||
|
|
||||||
// syncServiceAccountQueue handles service account events:
|
// syncServiceAccountQueue handles service account events:
|
||||||
// * ensures a referenced token exists for service accounts which still exist
|
// * ensures a referenced token exists for service accounts which still exist
|
||||||
@ -166,29 +162,22 @@ type TokensController struct {
|
|||||||
|
|
||||||
// Runs controller blocks until stopCh is closed
|
// Runs controller blocks until stopCh is closed
|
||||||
func (e *TokensController) Run(workers int, stopCh <-chan struct{}) {
|
func (e *TokensController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
|
// Shut down queues
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
defer e.syncServiceAccountQueue.ShutDown()
|
||||||
|
defer e.syncSecretQueue.ShutDown()
|
||||||
|
|
||||||
// Start controllers (to fill stores, call informers, fill work queues)
|
if !controller.WaitForCacheSync("tokens", stopCh, e.serviceAccountSynced, e.secretSynced) {
|
||||||
go e.serviceAccountController.Run(stopCh)
|
return
|
||||||
go e.secretController.Run(stopCh)
|
|
||||||
|
|
||||||
// Wait for stores to fill
|
|
||||||
for !e.serviceAccountController.HasSynced() || !e.secretController.HasSynced() {
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spawn workers to process work queues
|
glog.V(5).Infof("Starting workers")
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go wait.Until(e.syncServiceAccount, 0, stopCh)
|
go wait.Until(e.syncServiceAccount, 0, stopCh)
|
||||||
go wait.Until(e.syncSecret, 0, stopCh)
|
go wait.Until(e.syncSecret, 0, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block until stop channel is closed
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
|
glog.V(1).Infof("Shutting down")
|
||||||
// Shut down queues
|
|
||||||
e.syncServiceAccountQueue.ShutDown()
|
|
||||||
e.syncSecretQueue.ShutDown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TokensController) queueServiceAccountSync(obj interface{}) {
|
func (e *TokensController) queueServiceAccountSync(obj interface{}) {
|
||||||
@ -423,7 +412,7 @@ func (e *TokensController) ensureReferencedToken(serviceAccount *v1.ServiceAccou
|
|||||||
}
|
}
|
||||||
// Manually add the new token to the cache store.
|
// Manually add the new token to the cache store.
|
||||||
// This prevents the service account update (below) triggering another token creation, if the referenced token couldn't be found in the store
|
// This prevents the service account update (below) triggering another token creation, if the referenced token couldn't be found in the store
|
||||||
e.secrets.Add(createdToken)
|
e.updatedSecrets.Mutation(createdToken)
|
||||||
|
|
||||||
// Try to add a reference to the newly created token to the service account
|
// Try to add a reference to the newly created token to the service account
|
||||||
addedReference := false
|
addedReference := false
|
||||||
@ -626,15 +615,11 @@ func (e *TokensController) removeSecretReference(saNamespace string, saName stri
|
|||||||
|
|
||||||
func (e *TokensController) getServiceAccount(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*v1.ServiceAccount, error) {
|
func (e *TokensController) getServiceAccount(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*v1.ServiceAccount, error) {
|
||||||
// Look up in cache
|
// Look up in cache
|
||||||
obj, exists, err := e.serviceAccounts.GetByKey(makeCacheKey(ns, name))
|
sa, err := e.serviceAccounts.ServiceAccounts(ns).Get(name)
|
||||||
if err != nil {
|
if err != nil && !apierrors.IsNotFound(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if exists {
|
if sa != nil {
|
||||||
sa, ok := obj.(*v1.ServiceAccount)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("expected *v1.ServiceAccount, got %#v", sa)
|
|
||||||
}
|
|
||||||
// Ensure UID matches if given
|
// Ensure UID matches if given
|
||||||
if len(uid) == 0 || uid == sa.UID {
|
if len(uid) == 0 || uid == sa.UID {
|
||||||
return sa, nil
|
return sa, nil
|
||||||
@ -646,7 +631,7 @@ func (e *TokensController) getServiceAccount(ns string, name string, uid types.U
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Live lookup
|
// Live lookup
|
||||||
sa, err := e.client.Core().ServiceAccounts(ns).Get(name, metav1.GetOptions{})
|
sa, err = e.client.Core().ServiceAccounts(ns).Get(name, metav1.GetOptions{})
|
||||||
if apierrors.IsNotFound(err) {
|
if apierrors.IsNotFound(err) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@ -662,7 +647,7 @@ func (e *TokensController) getServiceAccount(ns string, name string, uid types.U
|
|||||||
|
|
||||||
func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*v1.Secret, error) {
|
func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*v1.Secret, error) {
|
||||||
// Look up in cache
|
// Look up in cache
|
||||||
obj, exists, err := e.secrets.GetByKey(makeCacheKey(ns, name))
|
obj, exists, err := e.updatedSecrets.GetByKey(makeCacheKey(ns, name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -699,7 +684,7 @@ func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetc
|
|||||||
// listTokenSecrets returns a list of all of the ServiceAccountToken secrets that
|
// listTokenSecrets returns a list of all of the ServiceAccountToken secrets that
|
||||||
// reference the given service account's name and uid
|
// reference the given service account's name and uid
|
||||||
func (e *TokensController) listTokenSecrets(serviceAccount *v1.ServiceAccount) ([]*v1.Secret, error) {
|
func (e *TokensController) listTokenSecrets(serviceAccount *v1.ServiceAccount) ([]*v1.Secret, error) {
|
||||||
namespaceSecrets, err := e.secrets.ByIndex("namespace", serviceAccount.Namespace)
|
namespaceSecrets, err := e.updatedSecrets.ByIndex("namespace", serviceAccount.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,8 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||||
|
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testGenerator struct {
|
type testGenerator struct {
|
||||||
@ -220,6 +222,7 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
UpdatedServiceAccount *v1.ServiceAccount
|
UpdatedServiceAccount *v1.ServiceAccount
|
||||||
DeletedServiceAccount *v1.ServiceAccount
|
DeletedServiceAccount *v1.ServiceAccount
|
||||||
AddedSecret *v1.Secret
|
AddedSecret *v1.Secret
|
||||||
|
AddedSecretLocal *v1.Secret
|
||||||
UpdatedSecret *v1.Secret
|
UpdatedSecret *v1.Secret
|
||||||
DeletedSecret *v1.Secret
|
DeletedSecret *v1.Secret
|
||||||
|
|
||||||
@ -306,6 +309,13 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "serviceaccounts"}, metav1.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))),
|
core.NewUpdateAction(schema.GroupVersionResource{Version: "v1", Resource: "serviceaccounts"}, metav1.NamespaceDefault, serviceAccount(addTokenSecretReference(missingSecretReferences()))),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"new serviceaccount with missing secrets and a local secret in the cache": {
|
||||||
|
ClientObjects: []runtime.Object{serviceAccount(missingSecretReferences())},
|
||||||
|
|
||||||
|
AddedServiceAccount: serviceAccount(tokenSecretReferences()),
|
||||||
|
AddedSecretLocal: serviceAccountTokenSecret(),
|
||||||
|
ExpectedActions: []core.Action{},
|
||||||
|
},
|
||||||
"new serviceaccount with non-token secrets": {
|
"new serviceaccount with non-token secrets": {
|
||||||
ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), opaqueSecret()},
|
ClientObjects: []runtime.Object{serviceAccount(regularSecretReferences()), opaqueSecret()},
|
||||||
|
|
||||||
@ -572,38 +582,44 @@ func TestTokenCreation(t *testing.T) {
|
|||||||
for _, reactor := range tc.Reactors {
|
for _, reactor := range tc.Reactors {
|
||||||
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactor(t))
|
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactor(t))
|
||||||
}
|
}
|
||||||
|
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||||
controller := NewTokensController(client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data"), MaxRetries: tc.MaxRetries})
|
secretInformer := informers.Core().V1().Secrets().Informer()
|
||||||
|
secrets := secretInformer.GetStore()
|
||||||
|
serviceAccounts := informers.Core().V1().ServiceAccounts().Informer().GetStore()
|
||||||
|
controller := NewTokensController(informers.Core().V1().ServiceAccounts(), informers.Core().V1().Secrets(), client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data"), MaxRetries: tc.MaxRetries})
|
||||||
|
|
||||||
if tc.ExistingServiceAccount != nil {
|
if tc.ExistingServiceAccount != nil {
|
||||||
controller.serviceAccounts.Add(tc.ExistingServiceAccount)
|
serviceAccounts.Add(tc.ExistingServiceAccount)
|
||||||
}
|
}
|
||||||
for _, s := range tc.ExistingSecrets {
|
for _, s := range tc.ExistingSecrets {
|
||||||
controller.secrets.Add(s)
|
secrets.Add(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
if tc.AddedServiceAccount != nil {
|
if tc.AddedServiceAccount != nil {
|
||||||
controller.serviceAccounts.Add(tc.AddedServiceAccount)
|
serviceAccounts.Add(tc.AddedServiceAccount)
|
||||||
controller.queueServiceAccountSync(tc.AddedServiceAccount)
|
controller.queueServiceAccountSync(tc.AddedServiceAccount)
|
||||||
}
|
}
|
||||||
if tc.UpdatedServiceAccount != nil {
|
if tc.UpdatedServiceAccount != nil {
|
||||||
controller.serviceAccounts.Add(tc.UpdatedServiceAccount)
|
serviceAccounts.Add(tc.UpdatedServiceAccount)
|
||||||
controller.queueServiceAccountUpdateSync(nil, tc.UpdatedServiceAccount)
|
controller.queueServiceAccountUpdateSync(nil, tc.UpdatedServiceAccount)
|
||||||
}
|
}
|
||||||
if tc.DeletedServiceAccount != nil {
|
if tc.DeletedServiceAccount != nil {
|
||||||
controller.serviceAccounts.Delete(tc.DeletedServiceAccount)
|
serviceAccounts.Delete(tc.DeletedServiceAccount)
|
||||||
controller.queueServiceAccountSync(tc.DeletedServiceAccount)
|
controller.queueServiceAccountSync(tc.DeletedServiceAccount)
|
||||||
}
|
}
|
||||||
if tc.AddedSecret != nil {
|
if tc.AddedSecret != nil {
|
||||||
controller.secrets.Add(tc.AddedSecret)
|
secrets.Add(tc.AddedSecret)
|
||||||
controller.queueSecretSync(tc.AddedSecret)
|
controller.queueSecretSync(tc.AddedSecret)
|
||||||
}
|
}
|
||||||
|
if tc.AddedSecretLocal != nil {
|
||||||
|
controller.updatedSecrets.Mutation(tc.AddedSecretLocal)
|
||||||
|
}
|
||||||
if tc.UpdatedSecret != nil {
|
if tc.UpdatedSecret != nil {
|
||||||
controller.secrets.Add(tc.UpdatedSecret)
|
secrets.Add(tc.UpdatedSecret)
|
||||||
controller.queueSecretUpdateSync(nil, tc.UpdatedSecret)
|
controller.queueSecretUpdateSync(nil, tc.UpdatedSecret)
|
||||||
}
|
}
|
||||||
if tc.DeletedSecret != nil {
|
if tc.DeletedSecret != nil {
|
||||||
controller.secrets.Delete(tc.DeletedSecret)
|
secrets.Delete(tc.DeletedSecret)
|
||||||
controller.queueSecretSync(tc.DeletedSecret)
|
controller.queueSecretSync(tc.DeletedSecret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -408,6 +408,7 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie
|
|||||||
serviceAccountAdmission.SetInternalKubeClientSet(internalRootClientset)
|
serviceAccountAdmission.SetInternalKubeClientSet(internalRootClientset)
|
||||||
internalInformers := internalinformers.NewSharedInformerFactory(internalRootClientset, controller.NoResyncPeriodFunc())
|
internalInformers := internalinformers.NewSharedInformerFactory(internalRootClientset, controller.NoResyncPeriodFunc())
|
||||||
serviceAccountAdmission.SetInternalKubeInformerFactory(internalInformers)
|
serviceAccountAdmission.SetInternalKubeInformerFactory(internalInformers)
|
||||||
|
informers := informers.NewSharedInformerFactory(rootClientset, controller.NoResyncPeriodFunc())
|
||||||
|
|
||||||
masterConfig := framework.NewMasterConfig()
|
masterConfig := framework.NewMasterConfig()
|
||||||
masterConfig.GenericConfig.EnableIndex = true
|
masterConfig.GenericConfig.EnableIndex = true
|
||||||
@ -418,10 +419,14 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie
|
|||||||
|
|
||||||
// Start the service account and service account token controllers
|
// Start the service account and service account token controllers
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
tokenController := serviceaccountcontroller.NewTokensController(rootClientset, serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)})
|
tokenController := serviceaccountcontroller.NewTokensController(
|
||||||
|
informers.Core().V1().ServiceAccounts(),
|
||||||
|
informers.Core().V1().Secrets(),
|
||||||
|
rootClientset,
|
||||||
|
serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)},
|
||||||
|
)
|
||||||
go tokenController.Run(1, stopCh)
|
go tokenController.Run(1, stopCh)
|
||||||
|
|
||||||
informers := informers.NewSharedInformerFactory(rootClientset, controller.NoResyncPeriodFunc())
|
|
||||||
serviceAccountController := serviceaccountcontroller.NewServiceAccountsController(
|
serviceAccountController := serviceaccountcontroller.NewServiceAccountsController(
|
||||||
informers.Core().V1().ServiceAccounts(),
|
informers.Core().V1().ServiceAccounts(),
|
||||||
informers.Core().V1().Namespaces(),
|
informers.Core().V1().Namespaces(),
|
||||||
|
Loading…
Reference in New Issue
Block a user