From 022bff7fbe08f65247da242834af5d899fc68011 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Wed, 15 Feb 2017 14:00:29 -0500 Subject: [PATCH] Switch admission to use shared informers --- cmd/kube-apiserver/app/BUILD | 1 - cmd/kube-apiserver/app/server.go | 11 +- federation/cmd/federation-apiserver/app/BUILD | 2 +- .../cmd/federation-apiserver/app/server.go | 4 +- pkg/kubeapiserver/admission/BUILD | 2 +- pkg/kubeapiserver/admission/initializer.go | 2 +- pkg/kubeapiserver/authorizer/BUILD | 5 +- pkg/kubeapiserver/authorizer/config.go | 45 +++- pkg/kubeapiserver/options/BUILD | 2 +- pkg/kubeapiserver/options/authorization.go | 2 +- plugin/pkg/admission/limitranger/BUILD | 4 +- plugin/pkg/admission/limitranger/admission.go | 13 +- .../admission/limitranger/admission_test.go | 4 +- .../admission/namespace/autoprovision/BUILD | 6 +- .../namespace/autoprovision/admission.go | 40 ++-- .../namespace/autoprovision/admission_test.go | 4 +- plugin/pkg/admission/namespace/exists/BUILD | 6 +- .../admission/namespace/exists/admission.go | 36 ++- .../namespace/exists/admission_test.go | 4 +- .../pkg/admission/namespace/lifecycle/BUILD | 6 +- .../namespace/lifecycle/admission.go | 51 +++-- .../namespace/lifecycle/admission_test.go | 4 +- plugin/pkg/admission/podnodeselector/BUILD | 7 +- .../admission/podnodeselector/admission.go | 35 ++- .../podnodeselector/admission_test.go | 11 +- plugin/pkg/admission/resourcequota/BUILD | 7 +- .../pkg/admission/resourcequota/admission.go | 30 ++- .../admission/resourcequota/admission_test.go | 210 +++++++++--------- .../resourcequota/resource_access.go | 43 +--- .../security/podsecuritypolicy/BUILD | 13 +- .../security/podsecuritypolicy/admission.go | 75 ++----- .../podsecuritypolicy/admission_test.go | 22 +- plugin/pkg/admission/serviceaccount/BUILD | 9 +- .../pkg/admission/serviceaccount/admission.go | 109 +++------ .../serviceaccount/admission_test.go | 75 ++++--- .../pkg/admission/storageclass/default/BUILD | 10 +- .../storageclass/default/admission.go | 77 ++----- .../storageclass/default/admission_test.go | 7 +- .../authorizer/rbac/bootstrappolicy/policy.go | 7 +- .../testdata/cluster-roles.yaml | 1 + test/integration/quota/quota_test.go | 7 + .../serviceaccount/service_account_test.go | 7 +- 42 files changed, 480 insertions(+), 536 deletions(-) diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index 96fa895cbfe..98619f96ff0 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -23,7 +23,6 @@ go_library( "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider/providers:go_default_library", - "//pkg/controller/informers:go_default_library", "//pkg/controller/serviceaccount:go_default_library", "//pkg/generated/openapi:go_default_library", "//pkg/kubeapiserver:go_default_library", diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index fcc5a6ea784..757a668587f 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -51,9 +51,8 @@ import ( "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/controller/informers" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/kubeapiserver" @@ -270,17 +269,14 @@ func Run(s *options.ServerRunOptions) error { glog.Errorf("Failed to create clientset with KUBE_API_VERSIONS=%q. KUBE_API_VERSIONS is only for testing. Things will break.", kubeAPIVersions) } - // TODO: Internal informers should switch to using 'pkg/client/informers/informers_generated', - // the second informer created here. Refactor clients which take the former to accept the latter. - sharedInformers := informers.NewSharedInformerFactory(nil, client, 10*time.Minute) - internalSharedInformers := internalversion.NewSharedInformerFactory(client, 10*time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) if client == nil { // TODO: Remove check once client can never be nil. glog.Errorf("Failed to setup bootstrap token authenticator because the loopback clientset was not setup properly.") } else { authenticatorConfig.BootstrapTokenAuthenticator = bootstrap.NewTokenAuthenticator( - internalSharedInformers.Core().InternalVersion().Secrets().Lister().Secrets(v1.NamespaceSystem), + sharedInformers.Core().InternalVersion().Secrets().Lister().Secrets(v1.NamespaceSystem), ) } @@ -367,7 +363,6 @@ func Run(s *options.ServerRunOptions) error { } sharedInformers.Start(wait.NeverStop) - internalSharedInformers.Start(wait.NeverStop) m.GenericAPIServer.PrepareRun().Run(wait.NeverStop) return nil } diff --git a/federation/cmd/federation-apiserver/app/BUILD b/federation/cmd/federation-apiserver/app/BUILD index 5f3170fe18b..edcdedf8d9d 100644 --- a/federation/cmd/federation-apiserver/app/BUILD +++ b/federation/cmd/federation-apiserver/app/BUILD @@ -36,8 +36,8 @@ go_library( "//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions/install:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/cloudprovider/providers:go_default_library", - "//pkg/controller/informers:go_default_library", "//pkg/generated/openapi:go_default_library", "//pkg/kubeapiserver:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", diff --git a/federation/cmd/federation-apiserver/app/server.go b/federation/cmd/federation-apiserver/app/server.go index 998542ac657..bc25d2f739e 100644 --- a/federation/cmd/federation-apiserver/app/server.go +++ b/federation/cmd/federation-apiserver/app/server.go @@ -41,7 +41,7 @@ import ( "k8s.io/kubernetes/federation/cmd/federation-apiserver/app/options" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/kubeapiserver" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" @@ -159,7 +159,7 @@ func Run(s *options.ServerRunOptions) error { if err != nil { return fmt.Errorf("failed to create clientset: %v", err) } - sharedInformers := informers.NewSharedInformerFactory(nil, client, 10*time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute) authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers) apiAuthorizer, err := authorizationConfig.New() diff --git a/pkg/kubeapiserver/admission/BUILD b/pkg/kubeapiserver/admission/BUILD index e0f63055517..643a947fc9a 100644 --- a/pkg/kubeapiserver/admission/BUILD +++ b/pkg/kubeapiserver/admission/BUILD @@ -25,7 +25,7 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/client/clientset_generated/internalclientset:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/authorization/authorizer", ], diff --git a/pkg/kubeapiserver/admission/initializer.go b/pkg/kubeapiserver/admission/initializer.go index 131ba98a17f..fe0a3710af4 100644 --- a/pkg/kubeapiserver/admission/initializer.go +++ b/pkg/kubeapiserver/admission/initializer.go @@ -20,7 +20,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" ) // TODO add a `WantsToRun` which takes a stopCh. Might make it generic. diff --git a/pkg/kubeapiserver/authorizer/BUILD b/pkg/kubeapiserver/authorizer/BUILD index 89f46026310..02c559fe4d6 100644 --- a/pkg/kubeapiserver/authorizer/BUILD +++ b/pkg/kubeapiserver/authorizer/BUILD @@ -24,10 +24,13 @@ go_library( srcs = ["config.go"], tags = ["automanaged"], deps = [ + "//pkg/apis/rbac:go_default_library", "//pkg/auth/authorizer/abac:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/rbac/internalversion:go_default_library", "//pkg/kubeapiserver/authorizer/modes:go_default_library", "//plugin/pkg/auth/authorizer/rbac:go_default_library", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apiserver/pkg/authorization/authorizer", "//vendor:k8s.io/apiserver/pkg/authorization/authorizerfactory", "//vendor:k8s.io/apiserver/pkg/authorization/union", diff --git a/pkg/kubeapiserver/authorizer/config.go b/pkg/kubeapiserver/authorizer/config.go index 3d6389ceeed..5f5a8a74107 100644 --- a/pkg/kubeapiserver/authorizer/config.go +++ b/pkg/kubeapiserver/authorizer/config.go @@ -21,12 +21,15 @@ import ( "fmt" "time" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizerfactory" "k8s.io/apiserver/pkg/authorization/union" "k8s.io/apiserver/plugin/pkg/authorizer/webhook" + rbacapi "k8s.io/kubernetes/pkg/apis/rbac" "k8s.io/kubernetes/pkg/auth/authorizer/abac" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + rbaclisters "k8s.io/kubernetes/pkg/client/listers/rbac/internalversion" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" "k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac" ) @@ -56,6 +59,38 @@ type AuthorizationConfig struct { InformerFactory informers.SharedInformerFactory } +type roleGetter struct { + lister rbaclisters.RoleLister +} + +func (g *roleGetter) GetRole(namespace, name string) (*rbacapi.Role, error) { + return g.lister.Roles(namespace).Get(name) +} + +type roleBindingLister struct { + lister rbaclisters.RoleBindingLister +} + +func (l *roleBindingLister) ListRoleBindings(namespace string) ([]*rbacapi.RoleBinding, error) { + return l.lister.RoleBindings(namespace).List(labels.Everything()) +} + +type clusterRoleGetter struct { + lister rbaclisters.ClusterRoleLister +} + +func (g *clusterRoleGetter) GetClusterRole(name string) (*rbacapi.ClusterRole, error) { + return g.lister.Get(name) +} + +type clusterRoleBindingLister struct { + lister rbaclisters.ClusterRoleBindingLister +} + +func (l *clusterRoleBindingLister) ListClusterRoleBindings() ([]*rbacapi.ClusterRoleBinding, error) { + return l.lister.List(labels.Everything()) +} + // New returns the right sort of union of multiple authorizer.Authorizer objects // based on the authorizationMode or an error. func (config AuthorizationConfig) New() (authorizer.Authorizer, error) { @@ -98,10 +133,10 @@ func (config AuthorizationConfig) New() (authorizer.Authorizer, error) { authorizers = append(authorizers, webhookAuthorizer) case modes.ModeRBAC: rbacAuthorizer := rbac.New( - config.InformerFactory.Roles().Lister(), - config.InformerFactory.RoleBindings().Lister(), - config.InformerFactory.ClusterRoles().Lister(), - config.InformerFactory.ClusterRoleBindings().Lister(), + &roleGetter{config.InformerFactory.Rbac().InternalVersion().Roles().Lister()}, + &roleBindingLister{config.InformerFactory.Rbac().InternalVersion().RoleBindings().Lister()}, + &clusterRoleGetter{config.InformerFactory.Rbac().InternalVersion().ClusterRoles().Lister()}, + &clusterRoleBindingLister{config.InformerFactory.Rbac().InternalVersion().ClusterRoleBindings().Lister()}, ) authorizers = append(authorizers, rbacAuthorizer) default: diff --git a/pkg/kubeapiserver/options/BUILD b/pkg/kubeapiserver/options/BUILD index 15d867467cb..488b423880b 100644 --- a/pkg/kubeapiserver/options/BUILD +++ b/pkg/kubeapiserver/options/BUILD @@ -21,8 +21,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/cloudprovider:go_default_library", - "//pkg/controller/informers:go_default_library", "//pkg/kubeapiserver/authenticator:go_default_library", "//pkg/kubeapiserver/authorizer:go_default_library", "//pkg/kubeapiserver/authorizer/modes:go_default_library", diff --git a/pkg/kubeapiserver/options/authorization.go b/pkg/kubeapiserver/options/authorization.go index de46ab3849b..b5c8aca1eb2 100644 --- a/pkg/kubeapiserver/options/authorization.go +++ b/pkg/kubeapiserver/options/authorization.go @@ -22,7 +22,7 @@ import ( "github.com/spf13/pflag" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer" authzmodes "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" ) diff --git a/plugin/pkg/admission/limitranger/BUILD b/plugin/pkg/admission/limitranger/BUILD index 8f2ff129ac4..73094c28ee0 100644 --- a/plugin/pkg/admission/limitranger/BUILD +++ b/plugin/pkg/admission/limitranger/BUILD @@ -18,8 +18,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/client/listers/core/internalversion:go_default_library", - "//pkg/controller/informers:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:github.com/hashicorp/golang-lru", "//vendor:k8s.io/apimachinery/pkg/api/meta", @@ -41,7 +41,7 @@ go_test( "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/equality", "//vendor:k8s.io/apimachinery/pkg/api/resource", diff --git a/plugin/pkg/admission/limitranger/admission.go b/plugin/pkg/admission/limitranger/admission.go index 6f467ed3698..b7d1cba7ab8 100644 --- a/plugin/pkg/admission/limitranger/admission.go +++ b/plugin/pkg/admission/limitranger/admission.go @@ -34,8 +34,8 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - coreinternallisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -54,7 +54,7 @@ type limitRanger struct { *admission.Handler client internalclientset.Interface actions LimitRangerActions - lister coreinternallisters.LimitRangeLister + lister corelisters.LimitRangeLister // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures. // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. @@ -69,9 +69,9 @@ type liveLookupEntry struct { } func (l *limitRanger) SetInformerFactory(f informers.SharedInformerFactory) { - limitRangeInformer := f.InternalLimitRanges().Informer() - l.SetReadyFunc(limitRangeInformer.HasSynced) - l.lister = f.InternalLimitRanges().Lister() + limitRangeInformer := f.Core().InternalVersion().LimitRanges() + l.SetReadyFunc(limitRangeInformer.Informer().HasSynced) + l.lister = limitRangeInformer.Lister() } func (l *limitRanger) Validate() error { @@ -167,6 +167,7 @@ func NewLimitRanger(actions LimitRangerActions) (admission.Interface, error) { }, nil } +var _ = kubeapiserveradmission.WantsInformerFactory(&limitRanger{}) var _ = kubeapiserveradmission.WantsInternalClientSet(&limitRanger{}) func (a *limitRanger) SetInternalClientSet(client internalclientset.Interface) { diff --git a/plugin/pkg/admission/limitranger/admission_test.go b/plugin/pkg/admission/limitranger/admission_test.go index d445d61340f..e4482dd1b44 100644 --- a/plugin/pkg/admission/limitranger/admission_test.go +++ b/plugin/pkg/admission/limitranger/admission_test.go @@ -32,7 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -590,7 +590,7 @@ func newMockClientForTest(limitRanges []api.LimitRange) *fake.Clientset { // newHandlerForTest returns a handler configured for testing. func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { - f := informers.NewSharedInformerFactory(nil, c, 5*time.Minute) + f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler, err := NewLimitRanger(&DefaultLimitRangerActions{}) if err != nil { return nil, f, err diff --git a/plugin/pkg/admission/namespace/autoprovision/BUILD b/plugin/pkg/admission/namespace/autoprovision/BUILD index e79e6a6ff15..f79073f0bc7 100644 --- a/plugin/pkg/admission/namespace/autoprovision/BUILD +++ b/plugin/pkg/admission/namespace/autoprovision/BUILD @@ -15,12 +15,12 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apiserver/pkg/admission", - "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -33,7 +33,7 @@ go_test( "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", diff --git a/plugin/pkg/admission/namespace/autoprovision/admission.go b/plugin/pkg/admission/namespace/autoprovision/admission.go index 3fdee637fb2..b61e3c4a532 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission.go @@ -23,10 +23,10 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/admission" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -41,14 +41,14 @@ func init() { // It is useful in deployments that do not want to restrict creation of a namespace prior to its usage. type provision struct { *admission.Handler - client internalclientset.Interface - namespaceInformer cache.SharedIndexInformer + client internalclientset.Interface + namespaceLister corelisters.NamespaceLister } var _ = kubeapiserveradmission.WantsInformerFactory(&provision{}) -var _ = kubeapiserveradmission.WantsInformerFactory(&provision{}) +var _ = kubeapiserveradmission.WantsInternalClientSet(&provision{}) -func (p *provision) Admit(a admission.Attributes) (err error) { +func (p *provision) Admit(a admission.Attributes) error { // if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do // if we're here, then the API server has found a route, which means that if we have a non-empty namespace // its a namespaced resource. @@ -59,6 +59,16 @@ func (p *provision) Admit(a admission.Attributes) (err error) { if !p.WaitForReady() { return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) } + + _, err := p.namespaceLister.Get(a.GetNamespace()) + if err == nil { + return nil + } + + if !errors.IsNotFound(err) { + return admission.NewForbidden(a, err) + } + namespace := &api.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: a.GetNamespace(), @@ -66,17 +76,12 @@ func (p *provision) Admit(a admission.Attributes) (err error) { }, Status: api.NamespaceStatus{}, } - _, exists, err := p.namespaceInformer.GetStore().Get(namespace) - if err != nil { - return admission.NewForbidden(a, err) - } - if exists { - return nil - } + _, err = p.client.Core().Namespaces().Create(namespace) if err != nil && !errors.IsAlreadyExists(err) { return admission.NewForbidden(a, err) } + return nil } @@ -92,13 +97,14 @@ func (p *provision) SetInternalClientSet(client internalclientset.Interface) { } func (p *provision) SetInformerFactory(f informers.SharedInformerFactory) { - p.namespaceInformer = f.InternalNamespaces().Informer() - p.SetReadyFunc(p.namespaceInformer.HasSynced) + namespaceInformer := f.Core().InternalVersion().Namespaces() + p.namespaceLister = namespaceInformer.Lister() + p.SetReadyFunc(namespaceInformer.Informer().HasSynced) } func (p *provision) Validate() error { - if p.namespaceInformer == nil { - return fmt.Errorf("missing namespaceInformer") + if p.namespaceLister == nil { + return fmt.Errorf("missing namespaceLister") } if p.client == nil { return fmt.Errorf("missing client") diff --git a/plugin/pkg/admission/namespace/autoprovision/admission_test.go b/plugin/pkg/admission/namespace/autoprovision/admission_test.go index a36e35a6d01..c03f36cb7b9 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission_test.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission_test.go @@ -30,13 +30,13 @@ import ( "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) // newHandlerForTest returns the admission controller configured for testing. func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { - f := informers.NewSharedInformerFactory(nil, c, 5*time.Minute) + f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewProvision() pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer.Initialize(handler) diff --git a/plugin/pkg/admission/namespace/exists/BUILD b/plugin/pkg/admission/namespace/exists/BUILD index e8ecbc8d0a5..e1938dce52a 100644 --- a/plugin/pkg/admission/namespace/exists/BUILD +++ b/plugin/pkg/admission/namespace/exists/BUILD @@ -15,12 +15,12 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apiserver/pkg/admission", - "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -33,7 +33,7 @@ go_test( "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index ad042211e55..06587bbae48 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -23,10 +23,10 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/admission" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -41,14 +41,14 @@ func init() { // It is useful in deployments that want to enforce pre-declaration of a Namespace resource. type exists struct { *admission.Handler - client internalclientset.Interface - namespaceInformer cache.SharedIndexInformer + client internalclientset.Interface + namespaceLister corelisters.NamespaceLister } var _ = kubeapiserveradmission.WantsInformerFactory(&exists{}) var _ = kubeapiserveradmission.WantsInternalClientSet(&exists{}) -func (e *exists) Admit(a admission.Attributes) (err error) { +func (e *exists) Admit(a admission.Attributes) error { // if we're here, then we've already passed authentication, so we're allowed to do what we're trying to do // if we're here, then the API server has found a route, which means that if we have a non-empty namespace // its a namespaced resource. @@ -60,20 +60,13 @@ func (e *exists) Admit(a admission.Attributes) (err error) { if !e.WaitForReady() { return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) } - namespace := &api.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: a.GetNamespace(), - Namespace: "", - }, - Status: api.NamespaceStatus{}, - } - _, exists, err := e.namespaceInformer.GetStore().Get(namespace) - if err != nil { - return errors.NewInternalError(err) - } - if exists { + _, err := e.namespaceLister.Get(a.GetNamespace()) + if err == nil { return nil } + if !errors.IsNotFound(err) { + return errors.NewInternalError(err) + } // in case of latency in our caches, make a call direct to storage to verify that it truly exists or not _, err = e.client.Core().Namespaces().Get(a.GetNamespace(), metav1.GetOptions{}) @@ -99,13 +92,14 @@ func (e *exists) SetInternalClientSet(client internalclientset.Interface) { } func (e *exists) SetInformerFactory(f informers.SharedInformerFactory) { - e.namespaceInformer = f.InternalNamespaces().Informer() - e.SetReadyFunc(e.namespaceInformer.HasSynced) + namespaceInformer := f.Core().InternalVersion().Namespaces() + e.namespaceLister = namespaceInformer.Lister() + e.SetReadyFunc(namespaceInformer.Informer().HasSynced) } func (e *exists) Validate() error { - if e.namespaceInformer == nil { - return fmt.Errorf("missing namespaceInformer") + if e.namespaceLister == nil { + return fmt.Errorf("missing namespaceLister") } if e.client == nil { return fmt.Errorf("missing client") diff --git a/plugin/pkg/admission/namespace/exists/admission_test.go b/plugin/pkg/admission/namespace/exists/admission_test.go index 3e9dd39ae60..389a0240a1d 100644 --- a/plugin/pkg/admission/namespace/exists/admission_test.go +++ b/plugin/pkg/admission/namespace/exists/admission_test.go @@ -29,13 +29,13 @@ import ( "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) // newHandlerForTest returns the admission controller configured for testing. func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { - f := informers.NewSharedInformerFactory(nil, c, 5*time.Minute) + f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewExists() pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer.Initialize(handler) diff --git a/plugin/pkg/admission/namespace/lifecycle/BUILD b/plugin/pkg/admission/namespace/lifecycle/BUILD index 979c78fbe13..a079e05267d 100644 --- a/plugin/pkg/admission/namespace/lifecycle/BUILD +++ b/plugin/pkg/admission/namespace/lifecycle/BUILD @@ -15,7 +15,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", @@ -23,7 +24,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/util/cache", - "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/clock", ], ) @@ -37,7 +37,7 @@ go_test( "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", diff --git a/plugin/pkg/admission/namespace/lifecycle/admission.go b/plugin/pkg/admission/namespace/lifecycle/admission.go index 52a56bd2e4c..e460a48bc49 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission.go @@ -28,11 +28,11 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" utilcache "k8s.io/apiserver/pkg/util/cache" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/clock" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -61,7 +61,7 @@ type lifecycle struct { *admission.Handler client internalclientset.Interface immortalNamespaces sets.String - namespaceInformer cache.SharedIndexInformer + namespaceLister corelisters.NamespaceLister // forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache. // if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server. forceLiveLookupCache *utilcache.LRUExpireCache @@ -109,24 +109,31 @@ func (l *lifecycle) Admit(a admission.Attributes) error { } var ( - namespaceObj interface{} - exists bool - err error + exists bool + err error ) - key := makeNamespaceKey(a.GetNamespace()) - namespaceObj, exists, err = l.namespaceInformer.GetStore().Get(key) + namespace, err := l.namespaceLister.Get(a.GetNamespace()) if err != nil { - return errors.NewInternalError(err) + if !errors.IsNotFound(err) { + return errors.NewInternalError(err) + } + } else { + exists = true } if !exists && a.GetOperation() == admission.Create { // give the cache time to observe the namespace before rejecting a create. // this helps when creating a namespace and immediately creating objects within it. time.Sleep(missingNamespaceWait) - namespaceObj, exists, err = l.namespaceInformer.GetStore().Get(key) - if err != nil { + namespace, err = l.namespaceLister.Get(a.GetNamespace()) + switch { + case errors.IsNotFound(err): + // no-op + case err != nil: return errors.NewInternalError(err) + default: + exists = true } if exists { glog.V(4).Infof("found %s in cache after waiting", a.GetNamespace()) @@ -137,17 +144,17 @@ func (l *lifecycle) Admit(a admission.Attributes) error { forceLiveLookup := false if _, ok := l.forceLiveLookupCache.Get(a.GetNamespace()); ok { // we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup. - forceLiveLookup = exists && namespaceObj.(*api.Namespace).Status.Phase == api.NamespaceActive + forceLiveLookup = exists && namespace.Status.Phase == api.NamespaceActive } // refuse to operate on non-existent namespaces if !exists || forceLiveLookup { // as a last resort, make a call directly to storage - namespaceObj, err = l.client.Core().Namespaces().Get(a.GetNamespace(), metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return err - } + namespace, err = l.client.Core().Namespaces().Get(a.GetNamespace(), metav1.GetOptions{}) + switch { + case errors.IsNotFound(err): + return err + case err != nil: return errors.NewInternalError(err) } glog.V(4).Infof("found %s via storage lookup", a.GetNamespace()) @@ -155,7 +162,6 @@ func (l *lifecycle) Admit(a admission.Attributes) error { // ensure that we're not trying to create objects in terminating namespaces if a.GetOperation() == admission.Create { - namespace := namespaceObj.(*api.Namespace) if namespace.Status.Phase != api.NamespaceTerminating { return nil } @@ -182,8 +188,9 @@ func newLifecycleWithClock(immortalNamespaces sets.String, clock utilcache.Clock } func (l *lifecycle) SetInformerFactory(f informers.SharedInformerFactory) { - l.namespaceInformer = f.InternalNamespaces().Informer() - l.SetReadyFunc(l.namespaceInformer.HasSynced) + namespaceInformer := f.Core().InternalVersion().Namespaces() + l.namespaceLister = namespaceInformer.Lister() + l.SetReadyFunc(namespaceInformer.Informer().HasSynced) } func (l *lifecycle) SetInternalClientSet(client internalclientset.Interface) { @@ -191,8 +198,8 @@ func (l *lifecycle) SetInternalClientSet(client internalclientset.Interface) { } func (l *lifecycle) Validate() error { - if l.namespaceInformer == nil { - return fmt.Errorf("missing namespaceInformer") + if l.namespaceLister == nil { + return fmt.Errorf("missing namespaceLister") } if l.client == nil { return fmt.Errorf("missing client") diff --git a/plugin/pkg/admission/namespace/lifecycle/admission_test.go b/plugin/pkg/admission/namespace/lifecycle/admission_test.go index b5a9cb597e3..28be63b4731 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission_test.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission_test.go @@ -31,7 +31,7 @@ import ( "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -42,7 +42,7 @@ func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.Sh // newHandlerForTestWithClock returns a configured handler for testing. func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (admission.Interface, informers.SharedInformerFactory, error) { - f := informers.NewSharedInformerFactory(nil, c, 5*time.Minute) + f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler, err := newLifecycleWithClock(sets.NewString(metav1.NamespaceDefault, metav1.NamespaceSystem), cacheClock) if err != nil { return nil, f, err diff --git a/plugin/pkg/admission/podnodeselector/BUILD b/plugin/pkg/admission/podnodeselector/BUILD index 30eadb32808..aadc586f57d 100644 --- a/plugin/pkg/admission/podnodeselector/BUILD +++ b/plugin/pkg/admission/podnodeselector/BUILD @@ -15,7 +15,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", @@ -23,7 +24,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/util/yaml", "//vendor:k8s.io/apiserver/pkg/admission", - "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -36,11 +36,10 @@ go_test( "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apiserver/pkg/admission", ], ) diff --git a/plugin/pkg/admission/podnodeselector/admission.go b/plugin/pkg/admission/podnodeselector/admission.go index 2c5b6320f9f..c1a3fe2307e 100644 --- a/plugin/pkg/admission/podnodeselector/admission.go +++ b/plugin/pkg/admission/podnodeselector/admission.go @@ -28,10 +28,10 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/apiserver/pkg/admission" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -51,13 +51,14 @@ func init() { // podNodeSelector is an implementation of admission.Interface. type podNodeSelector struct { *admission.Handler - client internalclientset.Interface - namespaceInformer cache.SharedIndexInformer + client internalclientset.Interface + namespaceLister corelisters.NamespaceLister // global default node selector and namespace whitelists in a cluster. clusterNodeSelectors map[string]string } var _ = kubeapiserveradmission.WantsInternalClientSet(&podNodeSelector{}) +var _ = kubeapiserveradmission.WantsInformerFactory(&podNodeSelector{}) type pluginConfig struct { PodNodeSelectorPluginConfig map[string]string @@ -114,18 +115,8 @@ func (p *podNodeSelector) Admit(a admission.Attributes) error { nsName := a.GetNamespace() var namespace *api.Namespace - namespaceObj, exists, err := p.namespaceInformer.GetStore().Get(&api.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: nsName, - Namespace: "", - }, - }) - if err != nil { - return errors.NewInternalError(err) - } - if exists { - namespace = namespaceObj.(*api.Namespace) - } else { + namespace, err := p.namespaceLister.Get(nsName) + if errors.IsNotFound(err) { namespace, err = p.defaultGetNamespace(nsName) if err != nil { if errors.IsNotFound(err) { @@ -133,7 +124,10 @@ func (p *podNodeSelector) Admit(a admission.Attributes) error { } return errors.NewInternalError(err) } + } else if err != nil { + return errors.NewInternalError(err) } + namespaceNodeSelector, err := p.getNodeSelectorMap(namespace) if err != nil { return err @@ -173,13 +167,14 @@ func (a *podNodeSelector) SetInternalClientSet(client internalclientset.Interfac } func (p *podNodeSelector) SetInformerFactory(f informers.SharedInformerFactory) { - p.namespaceInformer = f.InternalNamespaces().Informer() - p.SetReadyFunc(p.namespaceInformer.HasSynced) + namespaceInformer := f.Core().InternalVersion().Namespaces() + p.namespaceLister = namespaceInformer.Lister() + p.SetReadyFunc(namespaceInformer.Informer().HasSynced) } func (p *podNodeSelector) Validate() error { - if p.namespaceInformer == nil { - return fmt.Errorf("missing namespaceInformer") + if p.namespaceLister == nil { + return fmt.Errorf("missing namespaceLister") } if p.client == nil { return fmt.Errorf("missing client") diff --git a/plugin/pkg/admission/podnodeselector/admission_test.go b/plugin/pkg/admission/podnodeselector/admission_test.go index c638fa9d05f..4e6d874e799 100644 --- a/plugin/pkg/admission/podnodeselector/admission_test.go +++ b/plugin/pkg/admission/podnodeselector/admission_test.go @@ -22,12 +22,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" - "k8s.io/kubernetes/pkg/controller/informers" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -45,7 +44,9 @@ func TestPodAdmission(t *testing.T) { if err != nil { t.Errorf("unexpected error initializing handler: %v", err) } - informerFactory.Start(wait.NeverStop) + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) pod := &api.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: "testNamespace"}, @@ -144,7 +145,7 @@ func TestPodAdmission(t *testing.T) { for _, test := range tests { if !test.ignoreTestNamespaceNodeSelector { namespace.ObjectMeta.Annotations = map[string]string{"scheduler.alpha.kubernetes.io/node-selector": test.namespaceNodeSelector} - handler.namespaceInformer.GetStore().Update(namespace) + informerFactory.Core().InternalVersion().Namespaces().Informer().GetStore().Update(namespace) } handler.clusterNodeSelectors = make(map[string]string) handler.clusterNodeSelectors["clusterDefaultNodeSelector"] = test.defaultNodeSelector @@ -180,7 +181,7 @@ func TestHandles(t *testing.T) { // newHandlerForTest returns the admission controller configured for testing. func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) { - f := informers.NewSharedInformerFactory(nil, c, 5*time.Minute) + f := informers.NewSharedInformerFactory(c, 5*time.Minute) handler := NewPodNodeSelector(nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer.Initialize(handler) diff --git a/plugin/pkg/admission/resourcequota/BUILD b/plugin/pkg/admission/resourcequota/BUILD index c693c4f7119..83e172d69c7 100644 --- a/plugin/pkg/admission/resourcequota/BUILD +++ b/plugin/pkg/admission/resourcequota/BUILD @@ -21,6 +21,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//pkg/quota:go_default_library", "//pkg/quota/install:go_default_library", @@ -35,16 +37,15 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/apimachinery/announced", "//vendor:k8s.io/apimachinery/pkg/apimachinery/registered", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/serializer", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/storage/etcd", - "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/workqueue", ], ) @@ -57,6 +58,8 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/controller:go_default_library", "//pkg/quota:go_default_library", "//pkg/quota/generic:go_default_library", "//pkg/quota/install:go_default_library", diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index 2d9ee4f7f5b..d65d557c814 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -24,6 +24,7 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/install" @@ -59,6 +60,7 @@ type quotaAdmission struct { stopCh <-chan struct{} registry quota.Registry numEvaluators int + quotaAccessor *quotaAccessor evaluator Evaluator } @@ -73,29 +75,41 @@ type liveLookupEntry struct { // using the provided registry. The registry must have the capability to handle group/kinds that // are persisted by the server this admission controller is intercepting func NewResourceQuota(registry quota.Registry, config *resourcequotaapi.Configuration, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) { + quotaAccessor, err := newQuotaAccessor() + if err != nil { + return nil, err + } + return "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), stopCh: stopCh, registry: registry, numEvaluators: numEvaluators, config: config, + quotaAccessor: quotaAccessor, + evaluator: NewQuotaEvaluator(quotaAccessor, registry, nil, config, numEvaluators, stopCh), }, nil } func (a *quotaAdmission) SetInternalClientSet(client internalclientset.Interface) { - var err error - quotaAccessor, err := newQuotaAccessor(client) - if err != nil { - // TODO handle errors more cleanly - panic(err) - } - go quotaAccessor.Run(a.stopCh) + a.quotaAccessor.client = client +} - a.evaluator = NewQuotaEvaluator(quotaAccessor, a.registry, nil, a.config, a.numEvaluators, a.stopCh) +func (a *quotaAdmission) SetInformerFactory(f informers.SharedInformerFactory) { + a.quotaAccessor.lister = f.Core().InternalVersion().ResourceQuotas().Lister() } // Validate ensures an authorizer is set. func (a *quotaAdmission) Validate() error { + if a.quotaAccessor == nil { + return fmt.Errorf("missing quotaAccessor") + } + if a.quotaAccessor.client == nil { + return fmt.Errorf("missing quotaAccessor.client") + } + if a.quotaAccessor.lister == nil { + return fmt.Errorf("missing quotaAccessor.lister") + } if a.evaluator == nil { return fmt.Errorf("missing evaluator") } diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 0bbc6b2df5b..4fa876c01a7 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -33,6 +33,8 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/generic" "k8s.io/kubernetes/pkg/quota/install" @@ -123,14 +125,14 @@ func TestPrettyPrint(t *testing.T) { // TestAdmissionIgnoresDelete verifies that the admission controller ignores delete operations func TestAdmissionIgnoresDelete(t *testing.T) { - kubeClient := fake.NewSimpleClientset() - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -158,14 +160,14 @@ func TestAdmissionIgnoresSubresources(t *testing.T) { } resourceQuota.Status.Hard[api.ResourceMemory] = resource.MustParse("2Gi") resourceQuota.Status.Used[api.ResourceMemory] = resource.MustParse("1Gi") - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -173,7 +175,7 @@ func TestAdmissionIgnoresSubresources(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) newPod := validPod("123", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) if err == nil { @@ -202,14 +204,14 @@ func TestAdmitBelowQuotaLimit(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -217,7 +219,7 @@ func TestAdmitBelowQuotaLimit(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) if err != nil { @@ -285,14 +287,14 @@ func TestAdmitHandlesOldObjects(t *testing.T) { } // start up quota system - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -300,7 +302,7 @@ func TestAdmitHandlesOldObjects(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) // old service was a load balancer, but updated version is a node port. existingService := &api.Service{ @@ -382,14 +384,14 @@ func TestAdmitHandlesCreatingUpdates(t *testing.T) { } // start up quota system - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -397,7 +399,7 @@ func TestAdmitHandlesCreatingUpdates(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) // old service didn't exist, so this update is actually a create oldService := &api.Service{ @@ -476,14 +478,14 @@ func TestAdmitExceedQuotaLimit(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -491,7 +493,7 @@ func TestAdmitExceedQuotaLimit(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) if err == nil { @@ -520,23 +522,22 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) // verify all values are specified as required on the quota newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("200m", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) @@ -570,8 +571,6 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) liveLookupCache, err := lru.New(100) if err != nil { t.Fatal(err) @@ -579,10 +578,12 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() quotaAccessor.liveLookupCache = liveLookupCache - go quotaAccessor.Run(stopCh) config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -591,7 +592,7 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) { evaluator: evaluator, } // Add to the index - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("200m", ""))) // Add to the lru cache so we do not do a live client lookup liveLookupCache.Add(newPod.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(30 * time.Second)), items: []*api.ResourceQuota{}}) @@ -641,14 +642,14 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -656,8 +657,8 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuotaNonTerminating) - indexer.Add(resourceQuotaTerminating) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuotaNonTerminating) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuotaTerminating) // create a pod that has an active deadline newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", ""))) @@ -746,14 +747,14 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuotaBestEffort, resourceQuotaNotBestEffort) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuotaBestEffort, resourceQuotaNotBestEffort) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -761,8 +762,8 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuotaBestEffort) - indexer.Add(resourceQuotaNotBestEffort) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuotaBestEffort) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuotaNotBestEffort) // create a pod that is best effort because it does not make a request for anything newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))) @@ -838,14 +839,14 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -853,7 +854,7 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) if err != nil { @@ -922,8 +923,6 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) // create a dummy evaluator so we can trigger quota podEvaluator := &generic.ObjectCountEvaluator{ @@ -939,9 +938,11 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) evaluator.(*quotaEvaluator).registry = registry @@ -950,7 +951,7 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) newPod := validPod("pod-without-namespace", 1, getResourceRequirements(getResourceList("1", "2Gi"), getResourceList("", ""))) // unset the namespace @@ -980,14 +981,14 @@ func TestAdmitRejectsNegativeUsage(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -995,7 +996,7 @@ func TestAdmitRejectsNegativeUsage(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) // verify quota rejects negative pvc storage requests newPvc := validPersistentVolumeClaim("not-allowed-pvc", getResourceRequirements(api.ResourceList{api.ResourceStorage: resource.MustParse("-1Gi")}, api.ResourceList{})) err := handler.Admit(admission.NewAttributesRecord(newPvc, nil, api.Kind("PersistentVolumeClaim").WithVersion("version"), newPvc.Namespace, newPvc.Name, api.Resource("persistentvolumeclaims").WithVersion("version"), "", admission.Create, nil)) @@ -1026,14 +1027,14 @@ func TestAdmitWhenUnrelatedResourceExceedsQuota(t *testing.T) { }, }, } - kubeClient := fake.NewSimpleClientset(resourceQuota) - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + kubeClient := fake.NewSimpleClientset(resourceQuota) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() config := &resourcequotaapi.Configuration{} evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) @@ -1041,7 +1042,7 @@ func TestAdmitWhenUnrelatedResourceExceedsQuota(t *testing.T) { Handler: admission.NewHandler(admission.Create, admission.Update), evaluator: evaluator, } - indexer.Add(resourceQuota) + informerFactory.Core().InternalVersion().ResourceQuotas().Informer().GetIndexer().Add(resourceQuota) // create a pod that should pass existing quota newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))) @@ -1054,13 +1055,13 @@ func TestAdmitWhenUnrelatedResourceExceedsQuota(t *testing.T) { // TestAdmitLimitedResourceNoQuota verifies if a limited resource is configured with no quota, it cannot be consumed. func TestAdmitLimitedResourceNoQuota(t *testing.T) { kubeClient := fake.NewSimpleClientset() - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() // disable consumption of cpu unless there is a covering quota. config := &resourcequotaapi.Configuration{ @@ -1087,13 +1088,13 @@ func TestAdmitLimitedResourceNoQuota(t *testing.T) { // TestAdmitLimitedResourceNoQuotaIgnoresNonMatchingResources shows it ignores non matching resources in config. func TestAdmitLimitedResourceNoQuotaIgnoresNonMatchingResources(t *testing.T) { kubeClient := fake.NewSimpleClientset() - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() // disable consumption of cpu unless there is a covering quota. config := &resourcequotaapi.Configuration{ @@ -1135,9 +1136,10 @@ func TestAdmitLimitedResourceWithQuota(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() // disable consumption of cpu unless there is a covering quota. // disable consumption of cpu unless there is a covering quota. @@ -1192,9 +1194,10 @@ func TestAdmitLimitedResourceWithMultipleQuota(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() // disable consumption of cpu unless there is a covering quota. // disable consumption of cpu unless there is a covering quota. @@ -1239,9 +1242,10 @@ func TestAdmitLimitedResourceWithQuotaThatDoesNotCover(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - quotaAccessor, _ := newQuotaAccessor(kubeClient) - quotaAccessor.indexer = indexer - go quotaAccessor.Run(stopCh) + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + quotaAccessor, _ := newQuotaAccessor() + quotaAccessor.client = kubeClient + quotaAccessor.lister = informerFactory.Core().InternalVersion().ResourceQuotas().Lister() // disable consumption of cpu unless there is a covering quota. // disable consumption of cpu unless there is a covering quota. diff --git a/plugin/pkg/admission/resourcequota/resource_access.go b/plugin/pkg/admission/resourcequota/resource_access.go index a74b2049fca..8f2fb39c3bb 100644 --- a/plugin/pkg/admission/resourcequota/resource_access.go +++ b/plugin/pkg/admission/resourcequota/resource_access.go @@ -20,17 +20,14 @@ import ( "fmt" "time" - "github.com/golang/glog" lru "github.com/hashicorp/golang-lru" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apiserver/pkg/storage/etcd" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" ) // QuotaAccessor abstracts the get/set logic from the rest of the Evaluator. This could be a test stub, a straight passthrough, @@ -47,9 +44,8 @@ type QuotaAccessor interface { type quotaAccessor struct { client clientset.Interface - // indexer that holds quota objects by namespace - indexer cache.Indexer - reflector *cache.Reflector + // lister can list/get quota objects from a shared informer's cache + lister corelisters.ResourceQuotaLister // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures. // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. @@ -63,7 +59,7 @@ type quotaAccessor struct { } // newQuotaAccessor creates an object that conforms to the QuotaAccessor interface to be used to retrieve quota objects. -func newQuotaAccessor(client clientset.Interface) (*quotaAccessor, error) { +func newQuotaAccessor() (*quotaAccessor, error) { liveLookupCache, err := lru.New(100) if err != nil { return nil, err @@ -72,36 +68,15 @@ func newQuotaAccessor(client clientset.Interface) (*quotaAccessor, error) { if err != nil { return nil, err } - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return client.Core().ResourceQuotas(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Core().ResourceQuotas(metav1.NamespaceAll).Watch(options) - }, - } - indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0) + // client and lister will be set when SetInternalClientSet and SetInformerFactory are invoked return "aAccessor{ - client: client, - indexer: indexer, - reflector: reflector, liveLookupCache: liveLookupCache, liveTTL: time.Duration(30 * time.Second), updatedQuotas: updatedCache, }, nil } -// Run begins watching and syncing. -func (e *quotaAccessor) Run(stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - - e.reflector.RunUntil(stopCh) - - <-stopCh - glog.Infof("Shutting down quota accessor") -} - func (e *quotaAccessor) UpdateQuotaStatus(newQuota *api.ResourceQuota) error { updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(newQuota) if err != nil { @@ -136,9 +111,9 @@ func (e *quotaAccessor) checkCache(quota *api.ResourceQuota) *api.ResourceQuota func (e *quotaAccessor) GetQuotas(namespace string) ([]api.ResourceQuota, error) { // determine if there are any quotas in this namespace // if there are no quotas, we don't need to do anything - items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: ""}}) + items, err := e.lister.ResourceQuotas(namespace).List(labels.Everything()) if err != nil { - return nil, fmt.Errorf("error resolving quota.") + return nil, fmt.Errorf("error resolving quota: %v", err) } // if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it. @@ -169,7 +144,7 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]api.ResourceQuota, error) resourceQuotas := []api.ResourceQuota{} for i := range items { - quota := items[i].(*api.ResourceQuota) + quota := items[i] quota = e.checkCache(quota) // always make a copy. We're going to muck around with this and we should never mutate the originals resourceQuotas = append(resourceQuotas, *quota) diff --git a/plugin/pkg/admission/security/podsecuritypolicy/BUILD b/plugin/pkg/admission/security/podsecuritypolicy/BUILD index 9ad8aec8270..2ccef94ab34 100644 --- a/plugin/pkg/admission/security/podsecuritypolicy/BUILD +++ b/plugin/pkg/admission/security/podsecuritypolicy/BUILD @@ -15,7 +15,8 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/apis/extensions:go_default_library", - "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/extensions/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//pkg/security/podsecuritypolicy:go_default_library", "//pkg/security/podsecuritypolicy/util:go_default_library", @@ -23,15 +24,11 @@ go_library( "//pkg/serviceaccount:go_default_library", "//pkg/util/maps:go_default_library", "//vendor:github.com/golang/glog", - "//vendor:k8s.io/apimachinery/pkg/api/errors", - "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/util/validation/field", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/authentication/user", "//vendor:k8s.io/apiserver/pkg/authorization/authorizer", - "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -43,6 +40,9 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/apis/extensions:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/extensions/internalversion:go_default_library", + "//pkg/controller:go_default_library", "//pkg/security/apparmor:go_default_library", "//pkg/security/podsecuritypolicy:go_default_library", "//pkg/security/podsecuritypolicy/seccomp:go_default_library", @@ -54,7 +54,6 @@ go_test( "//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/authentication/user", "//vendor:k8s.io/apiserver/pkg/authorization/authorizer", - "//vendor:k8s.io/client-go/tools/cache", ], ) diff --git a/plugin/pkg/admission/security/podsecuritypolicy/admission.go b/plugin/pkg/admission/security/podsecuritypolicy/admission.go index 281a8b1614e..c8a053d413e 100644 --- a/plugin/pkg/admission/security/podsecuritypolicy/admission.go +++ b/plugin/pkg/admission/security/podsecuritypolicy/admission.go @@ -23,18 +23,15 @@ import ( "github.com/golang/glog" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" psp "k8s.io/kubernetes/pkg/security/podsecuritypolicy" psputil "k8s.io/kubernetes/pkg/security/podsecuritypolicy/util" @@ -55,7 +52,7 @@ func init() { } // PSPMatchFn allows plugging in how PSPs are matched against user information. -type PSPMatchFn func(store cache.Store, user user.Info, sa user.Info, authz authorizer.Authorizer) ([]*extensions.PodSecurityPolicy, error) +type PSPMatchFn func(lister extensionslisters.PodSecurityPolicyLister, user user.Info, sa user.Info, authz authorizer.Authorizer) ([]*extensions.PodSecurityPolicy, error) // podSecurityPolicyPlugin holds state for and implements the admission plugin. type podSecurityPolicyPlugin struct { @@ -64,10 +61,7 @@ type podSecurityPolicyPlugin struct { pspMatcher PSPMatchFn failOnNoPolicies bool authz authorizer.Authorizer - - reflector *cache.Reflector - stopChan chan struct{} - store cache.Store + lister extensionslisters.PodSecurityPolicyLister } // SetAuthorizer sets the authorizer. @@ -80,22 +74,18 @@ func (plugin *podSecurityPolicyPlugin) Validate() error { if plugin.authz == nil { return fmt.Errorf("%s requires an authorizer", PluginName) } - if plugin.store == nil { - return fmt.Errorf("%s requires an client", PluginName) - } - if plugin.store == nil { - return fmt.Errorf("%s requires an client", PluginName) + if plugin.lister == nil { + return fmt.Errorf("%s requires a lister", PluginName) } return nil } var _ admission.Interface = &podSecurityPolicyPlugin{} var _ kubeapiserveradmission.WantsAuthorizer = &podSecurityPolicyPlugin{} -var _ kubeapiserveradmission.WantsInternalClientSet = &podSecurityPolicyPlugin{} +var _ kubeapiserveradmission.WantsInformerFactory = &podSecurityPolicyPlugin{} // NewPlugin creates a new PSP admission plugin. func NewPlugin(strategyFactory psp.StrategyFactory, pspMatcher PSPMatchFn, failOnNoPolicies bool) *podSecurityPolicyPlugin { - return &podSecurityPolicyPlugin{ Handler: admission.NewHandler(admission.Create, admission.Update), strategyFactory: strategyFactory, @@ -104,35 +94,10 @@ func NewPlugin(strategyFactory psp.StrategyFactory, pspMatcher PSPMatchFn, failO } } -func (a *podSecurityPolicyPlugin) SetInternalClientSet(client internalclientset.Interface) { - a.store = cache.NewStore(cache.MetaNamespaceKeyFunc) - a.reflector = cache.NewReflector( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return client.Extensions().PodSecurityPolicies().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Extensions().PodSecurityPolicies().Watch(options) - }, - }, - &extensions.PodSecurityPolicy{}, - a.store, - 0, - ) - a.Run() -} - -func (a *podSecurityPolicyPlugin) Run() { - if a.stopChan == nil { - a.stopChan = make(chan struct{}) - } - a.reflector.RunUntil(a.stopChan) -} -func (a *podSecurityPolicyPlugin) Stop() { - if a.stopChan != nil { - close(a.stopChan) - a.stopChan = nil - } +func (a *podSecurityPolicyPlugin) SetInformerFactory(f informers.SharedInformerFactory) { + podSecurityPolicyInformer := f.Extensions().InternalVersion().PodSecurityPolicies() + a.lister = podSecurityPolicyInformer.Lister() + a.SetReadyFunc(podSecurityPolicyInformer.Informer().HasSynced) } // Admit determines if the pod should be admitted based on the requested security context @@ -165,7 +130,7 @@ func (c *podSecurityPolicyPlugin) Admit(a admission.Attributes) error { saInfo = serviceaccount.UserInfo(a.GetNamespace(), pod.Spec.ServiceAccountName, "") } - matchedPolicies, err := c.pspMatcher(c.store, a.GetUserInfo(), saInfo, c.authz) + matchedPolicies, err := c.pspMatcher(c.lister, a.GetUserInfo(), saInfo, c.authz) if err != nil { return admission.NewForbidden(a, err) } @@ -308,21 +273,21 @@ func (c *podSecurityPolicyPlugin) createProvidersFromPolicies(psps []*extensions return providers, errs } -// getMatchingPolicies returns policies from the store. For now this returns everything +// getMatchingPolicies returns policies from the lister. For now this returns everything // in the future it can filter based on UserInfo and permissions. // // TODO: this will likely need optimization since the initial implementation will // always query for authorization. Needs scale testing and possibly checking against // a cache. -func getMatchingPolicies(store cache.Store, user user.Info, sa user.Info, authz authorizer.Authorizer) ([]*extensions.PodSecurityPolicy, error) { +func getMatchingPolicies(lister extensionslisters.PodSecurityPolicyLister, user user.Info, sa user.Info, authz authorizer.Authorizer) ([]*extensions.PodSecurityPolicy, error) { matchedPolicies := make([]*extensions.PodSecurityPolicy, 0) - for _, c := range store.List() { - constraint, ok := c.(*extensions.PodSecurityPolicy) - if !ok { - return nil, errors.NewInternalError(fmt.Errorf("error converting object from store to a pod security policy: %v", c)) - } + list, err := lister.List(labels.Everything()) + if err != nil { + return nil, err + } + for _, constraint := range list { if authorizedForPolicy(user, constraint, authz) || authorizedForPolicy(sa, constraint, authz) { matchedPolicies = append(matchedPolicies, constraint) } diff --git a/plugin/pkg/admission/security/podsecuritypolicy/admission_test.go b/plugin/pkg/admission/security/podsecuritypolicy/admission_test.go index 2c3125c982f..abd6979d2f5 100644 --- a/plugin/pkg/admission/security/podsecuritypolicy/admission_test.go +++ b/plugin/pkg/admission/security/podsecuritypolicy/admission_test.go @@ -30,9 +30,11 @@ import ( kadmission "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" - "k8s.io/client-go/tools/cache" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/security/apparmor" kpsp "k8s.io/kubernetes/pkg/security/podsecuritypolicy" "k8s.io/kubernetes/pkg/security/podsecuritypolicy/seccomp" @@ -43,13 +45,13 @@ const defaultContainerName = "test-c" // NewTestAdmission provides an admission plugin with test implementations of internal structs. It uses // an authorizer that always returns true. -func NewTestAdmission(store cache.Store) kadmission.Interface { +func NewTestAdmission(lister extensionslisters.PodSecurityPolicyLister) kadmission.Interface { return &podSecurityPolicyPlugin{ Handler: kadmission.NewHandler(kadmission.Create), - store: store, strategyFactory: kpsp.NewSimpleStrategyFactory(), pspMatcher: getMatchingPolicies, authz: &TestAuthorizer{}, + lister: lister, } } @@ -1337,13 +1339,14 @@ func TestAdmitSysctls(t *testing.T) { } func testPSPAdmit(testCaseName string, psps []*extensions.PodSecurityPolicy, pod *kapi.Pod, shouldPass bool, expectedPSP string, t *testing.T) { - store := cache.NewStore(cache.MetaNamespaceKeyFunc) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + store := informerFactory.Extensions().InternalVersion().PodSecurityPolicies().Informer().GetStore() for _, psp := range psps { store.Add(psp) } - plugin := NewTestAdmission(store) + plugin := NewTestAdmission(informerFactory.Extensions().InternalVersion().PodSecurityPolicies().Lister()) attrs := kadmission.NewAttributesRecord(pod, nil, kapi.Kind("Pod").WithVersion("version"), "namespace", "", kapi.Resource("pods").WithVersion("version"), "", kadmission.Create, &user.DefaultInfo{}) err := plugin.Admit(attrs) @@ -1505,11 +1508,8 @@ func TestCreateProvidersFromConstraints(t *testing.T) { } for k, v := range testCases { - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - admit := &podSecurityPolicyPlugin{ Handler: kadmission.NewHandler(kadmission.Create, kadmission.Update), - store: store, strategyFactory: kpsp.NewSimpleStrategyFactory(), } @@ -1628,13 +1628,15 @@ func TestGetMatchingPolicies(t *testing.T) { }, } for k, v := range tests { - store := cache.NewStore(cache.MetaNamespaceKeyFunc) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + pspInformer := informerFactory.Extensions().InternalVersion().PodSecurityPolicies() + store := pspInformer.Informer().GetStore() for _, psp := range v.inPolicies { store.Add(psp) } authz := &TestAuthorizer{disallowed: v.disallowedPolicies} - allowedPolicies, err := getMatchingPolicies(store, v.user, v.sa, authz) + allowedPolicies, err := getMatchingPolicies(pspInformer.Lister(), v.user, v.sa, authz) if err != nil { t.Errorf("%s got unexpected error %#v", k, err) continue diff --git a/plugin/pkg/admission/serviceaccount/BUILD b/plugin/pkg/admission/serviceaccount/BUILD index 0ffa3efb744..1fcaaff715f 100644 --- a/plugin/pkg/admission/serviceaccount/BUILD +++ b/plugin/pkg/admission/serviceaccount/BUILD @@ -18,19 +18,18 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/serviceaccount:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/fields", - "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/sets", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/storage/names", - "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -42,6 +41,8 @@ go_test( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/controller:go_default_library", "//pkg/kubelet/types:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", diff --git a/plugin/pkg/admission/serviceaccount/admission.go b/plugin/pkg/admission/serviceaccount/admission.go index efa89a356d4..439f1f981c4 100644 --- a/plugin/pkg/admission/serviceaccount/admission.go +++ b/plugin/pkg/admission/serviceaccount/admission.go @@ -25,16 +25,15 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/storage/names" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" kubelet "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/serviceaccount" @@ -75,15 +74,12 @@ type serviceAccount struct { client internalclientset.Interface - serviceAccounts cache.Indexer - secrets cache.Indexer - - stopChan chan struct{} - serviceAccountsReflector *cache.Reflector - secretsReflector *cache.Reflector + serviceAccountLister corelisters.ServiceAccountLister + secretLister corelisters.SecretLister } var _ = kubeapiserveradmission.WantsInternalClientSet(&serviceAccount{}) +var _ = kubeapiserveradmission.WantsInformerFactory(&serviceAccount{}) // NewServiceAccount returns an admission.Interface implementation which limits admission of Pod CREATE requests based on the pod's ServiceAccount: // 1. If the pod does not specify a ServiceAccount, it sets the pod's ServiceAccount to "default" @@ -105,38 +101,18 @@ func NewServiceAccount() *serviceAccount { func (a *serviceAccount) SetInternalClientSet(cl internalclientset.Interface) { a.client = cl - a.serviceAccounts, a.serviceAccountsReflector = cache.NewNamespaceKeyedIndexerAndReflector( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return cl.Core().ServiceAccounts(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return cl.Core().ServiceAccounts(metav1.NamespaceAll).Watch(options) - }, - }, - &api.ServiceAccount{}, - 0, - ) +} - tokenSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(api.SecretTypeServiceAccountToken)}) - a.secrets, a.secretsReflector = cache.NewNamespaceKeyedIndexerAndReflector( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = tokenSelector.String() - return cl.Core().Secrets(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = tokenSelector.String() - return cl.Core().Secrets(metav1.NamespaceAll).Watch(options) - }, - }, - &api.Secret{}, - 0, - ) +func (a *serviceAccount) SetInformerFactory(f informers.SharedInformerFactory) { + serviceAccountInformer := f.Core().InternalVersion().ServiceAccounts() + a.serviceAccountLister = serviceAccountInformer.Lister() - if cl != nil { - a.Run() - } + secretInformer := f.Core().InternalVersion().Secrets() + a.secretLister = secretInformer.Lister() + + a.SetReadyFunc(func() bool { + return serviceAccountInformer.Informer().HasSynced() && secretInformer.Informer().HasSynced() + }) } // Validate ensures an authorizer is set. @@ -144,35 +120,15 @@ func (a *serviceAccount) Validate() error { if a.client == nil { return fmt.Errorf("missing client") } - if a.secrets == nil { - return fmt.Errorf("missing secretsIndexer") + if a.secretLister == nil { + return fmt.Errorf("missing secretLister") } - if a.secretsReflector == nil { - return fmt.Errorf("missing secretsReflector") - } - if a.serviceAccounts == nil { - return fmt.Errorf("missing serviceAccountsIndexer") - } - if a.serviceAccountsReflector == nil { - return fmt.Errorf("missing serviceAccountsReflector") + if a.serviceAccountLister == nil { + return fmt.Errorf("missing serviceAccountLister") } return nil } -func (s *serviceAccount) Run() { - if s.stopChan == nil { - s.stopChan = make(chan struct{}) - s.serviceAccountsReflector.RunUntil(s.stopChan) - s.secretsReflector.RunUntil(s.stopChan) - } -} -func (s *serviceAccount) Stop() { - if s.stopChan != nil { - close(s.stopChan) - s.stopChan = nil - } -} - func (s *serviceAccount) Admit(a admission.Attributes) (err error) { if a.GetResource().GroupResource() != api.Resource("pods") { return nil @@ -269,17 +225,12 @@ func (s *serviceAccount) enforceMountableSecrets(serviceAccount *api.ServiceAcco // getServiceAccount returns the ServiceAccount for the given namespace and name if it exists func (s *serviceAccount) getServiceAccount(namespace string, name string) (*api.ServiceAccount, error) { - key := &api.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: namespace}} - index, err := s.serviceAccounts.Index("namespace", key) - if err != nil { - return nil, err + serviceAccount, err := s.serviceAccountLister.ServiceAccounts(namespace).Get(name) + if err == nil { + return serviceAccount, nil } - - for _, obj := range index { - serviceAccount := obj.(*api.ServiceAccount) - if serviceAccount.Name == name { - return serviceAccount, nil - } + if !errors.IsNotFound(err) { + return nil, err } // Could not find in cache, attempt to look up directly @@ -332,15 +283,15 @@ func (s *serviceAccount) getReferencedServiceAccountToken(serviceAccount *api.Se // getServiceAccountTokens returns all ServiceAccountToken secrets for the given ServiceAccount func (s *serviceAccount) getServiceAccountTokens(serviceAccount *api.ServiceAccount) ([]*api.Secret, error) { - key := &api.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: serviceAccount.Namespace}} - index, err := s.secrets.Index("namespace", key) + tokens, err := s.secretLister.Secrets(serviceAccount.Namespace).List(labels.Everything()) if err != nil { return nil, err } - tokens := []*api.Secret{} - for _, obj := range index { - token := obj.(*api.Secret) + for _, token := range tokens { + if token.Type != api.SecretTypeServiceAccountToken { + continue + } if serviceaccount.InternalIsServiceAccountToken(token, serviceAccount) { tokens = append(tokens, token) diff --git a/plugin/pkg/admission/serviceaccount/admission_test.go b/plugin/pkg/admission/serviceaccount/admission_test.go index dcbc5b59b23..0f639e69218 100644 --- a/plugin/pkg/admission/serviceaccount/admission_test.go +++ b/plugin/pkg/admission/serviceaccount/admission_test.go @@ -27,6 +27,8 @@ import ( "k8s.io/apiserver/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + "k8s.io/kubernetes/pkg/controller" kubelet "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -130,12 +132,13 @@ func TestAssignsDefaultServiceAccountAndToleratesMissingAPIToken(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.MountServiceAccountToken = true admit.RequireAPIToken = false // Add the default service account for the ns into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: DefaultServiceAccountName, Namespace: ns, @@ -157,12 +160,13 @@ func TestAssignsDefaultServiceAccountAndRejectsMissingAPIToken(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.MountServiceAccountToken = true admit.RequireAPIToken = true // Add the default service account for the ns into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: DefaultServiceAccountName, Namespace: ns, @@ -189,7 +193,8 @@ func TestFetchesUncachedServiceAccount(t *testing.T) { }) admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.client = client admit.RequireAPIToken = false @@ -212,6 +217,8 @@ func TestDeniesInvalidServiceAccount(t *testing.T) { admit := NewServiceAccount() admit.SetInternalClientSet(client) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) pod := &api.Pod{} attrs := admission.NewAttributesRecord(pod, nil, api.Kind("Pod").WithVersion("version"), ns, "myname", api.Resource("pods").WithVersion("version"), "", admission.Create, nil) @@ -240,12 +247,13 @@ func TestAutomountsAPIToken(t *testing.T) { } admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.MountServiceAccountToken = true admit.RequireAPIToken = true // Add the default service account for the ns with a token into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: serviceAccountName, Namespace: ns, @@ -256,7 +264,7 @@ func TestAutomountsAPIToken(t *testing.T) { }, }) // Add a token for the service account into the cache - admit.secrets.Add(&api.Secret{ + informerFactory.Core().InternalVersion().Secrets().Informer().GetStore().Add(&api.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: tokenName, Namespace: ns, @@ -340,12 +348,13 @@ func TestRespectsExistingMount(t *testing.T) { } admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.MountServiceAccountToken = true admit.RequireAPIToken = true // Add the default service account for the ns with a token into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: serviceAccountName, Namespace: ns, @@ -356,7 +365,7 @@ func TestRespectsExistingMount(t *testing.T) { }, }) // Add a token for the service account into the cache - admit.secrets.Add(&api.Secret{ + informerFactory.Core().InternalVersion().Secrets().Informer().GetStore().Add(&api.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: tokenName, Namespace: ns, @@ -437,12 +446,13 @@ func TestAllowsReferencedSecret(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.LimitSecretReferences = true admit.RequireAPIToken = false // Add the default service account for the ns with a secret reference into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: DefaultServiceAccountName, Namespace: ns, @@ -517,12 +527,13 @@ func TestRejectsUnreferencedSecretVolumes(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.LimitSecretReferences = true admit.RequireAPIToken = false // Add the default service account for the ns into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: DefaultServiceAccountName, Namespace: ns, @@ -594,12 +605,13 @@ func TestAllowUnreferencedSecretVolumesForPermissiveSAs(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.LimitSecretReferences = false admit.RequireAPIToken = false // Add the default service account for the ns into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: DefaultServiceAccountName, Namespace: ns, @@ -625,12 +637,13 @@ func TestAllowsReferencedImagePullSecrets(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.LimitSecretReferences = true admit.RequireAPIToken = false // Add the default service account for the ns with a secret reference into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: DefaultServiceAccountName, Namespace: ns, @@ -656,12 +669,13 @@ func TestRejectsUnreferencedImagePullSecrets(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.LimitSecretReferences = true admit.RequireAPIToken = false // Add the default service account for the ns into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: DefaultServiceAccountName, Namespace: ns, @@ -684,12 +698,13 @@ func TestDoNotAddImagePullSecrets(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.LimitSecretReferences = true admit.RequireAPIToken = false // Add the default service account for the ns with a secret reference into the cache - admit.serviceAccounts.Add(&api.ServiceAccount{ + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(&api.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: DefaultServiceAccountName, Namespace: ns, @@ -720,7 +735,8 @@ func TestAddImagePullSecrets(t *testing.T) { ns := "myns" admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.LimitSecretReferences = true admit.RequireAPIToken = false @@ -735,7 +751,7 @@ func TestAddImagePullSecrets(t *testing.T) { }, } // Add the default service account for the ns with a secret reference into the cache - admit.serviceAccounts.Add(sa) + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(sa) pod := &api.Pod{} attrs := admission.NewAttributesRecord(pod, nil, api.Kind("Pod").WithVersion("version"), ns, "myname", api.Resource("pods").WithVersion("version"), "", admission.Create, nil) @@ -764,7 +780,8 @@ func TestMultipleReferencedSecrets(t *testing.T) { ) admit := NewServiceAccount() - admit.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + admit.SetInformerFactory(informerFactory) admit.MountServiceAccountToken = true admit.RequireAPIToken = true @@ -779,10 +796,10 @@ func TestMultipleReferencedSecrets(t *testing.T) { {Name: token2}, }, } - admit.serviceAccounts.Add(sa) + informerFactory.Core().InternalVersion().ServiceAccounts().Informer().GetStore().Add(sa) // Add two tokens for the service account into the cache. - admit.secrets.Add(&api.Secret{ + informerFactory.Core().InternalVersion().Secrets().Informer().GetStore().Add(&api.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: token2, Namespace: ns, @@ -796,7 +813,7 @@ func TestMultipleReferencedSecrets(t *testing.T) { api.ServiceAccountTokenKey: []byte("token-data"), }, }) - admit.secrets.Add(&api.Secret{ + informerFactory.Core().InternalVersion().Secrets().Informer().GetStore().Add(&api.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: token1, Namespace: ns, diff --git a/plugin/pkg/admission/storageclass/default/BUILD b/plugin/pkg/admission/storageclass/default/BUILD index f326e466433..f68808f2785 100644 --- a/plugin/pkg/admission/storageclass/default/BUILD +++ b/plugin/pkg/admission/storageclass/default/BUILD @@ -16,15 +16,13 @@ go_library( "//pkg/api:go_default_library", "//pkg/apis/storage:go_default_library", "//pkg/apis/storage/util:go_default_library", - "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/client/listers/storage/internalversion:go_default_library", "//pkg/kubeapiserver/admission:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", - "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", - "//vendor:k8s.io/apimachinery/pkg/watch", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apiserver/pkg/admission", - "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -37,6 +35,8 @@ go_test( "//pkg/api:go_default_library", "//pkg/apis/storage:go_default_library", "//pkg/apis/storage/util:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", + "//pkg/controller:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apiserver/pkg/admission", diff --git a/plugin/pkg/admission/storageclass/default/admission.go b/plugin/pkg/admission/storageclass/default/admission.go index dff36874286..86662515ff7 100644 --- a/plugin/pkg/admission/storageclass/default/admission.go +++ b/plugin/pkg/admission/storageclass/default/admission.go @@ -23,15 +23,13 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/labels" admission "k8s.io/apiserver/pkg/admission" - "k8s.io/client-go/tools/cache" api "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/storage" storageutil "k8s.io/kubernetes/pkg/apis/storage/util" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + storagelisters "k8s.io/kubernetes/pkg/client/listers/storage/internalversion" kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" ) @@ -49,15 +47,12 @@ func init() { // claimDefaulterPlugin holds state for and implements the admission plugin. type claimDefaulterPlugin struct { *admission.Handler - client internalclientset.Interface - reflector *cache.Reflector - stopChan chan struct{} - store cache.Store + lister storagelisters.StorageClassLister } var _ admission.Interface = &claimDefaulterPlugin{} -var _ = kubeapiserveradmission.WantsInternalClientSet(&claimDefaulterPlugin{}) +var _ = kubeapiserveradmission.WantsInformerFactory(&claimDefaulterPlugin{}) // newPlugin creates a new admission plugin. func newPlugin() *claimDefaulterPlugin { @@ -66,55 +61,20 @@ func newPlugin() *claimDefaulterPlugin { } } -func (a *claimDefaulterPlugin) SetInternalClientSet(client internalclientset.Interface) { - a.client = client - a.store = cache.NewStore(cache.MetaNamespaceKeyFunc) - a.reflector = cache.NewReflector( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return client.Storage().StorageClasses().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Storage().StorageClasses().Watch(options) - }, - }, - &storage.StorageClass{}, - a.store, - 0, - ) - - if client != nil { - a.Run() - } +func (a *claimDefaulterPlugin) SetInformerFactory(f informers.SharedInformerFactory) { + informer := f.Storage().InternalVersion().StorageClasses() + a.lister = informer.Lister() + a.SetReadyFunc(informer.Informer().HasSynced) } // Validate ensures an authorizer is set. func (a *claimDefaulterPlugin) Validate() error { - if a.client == nil { - return fmt.Errorf("missing client") - } - if a.reflector == nil { - return fmt.Errorf("missing reflector") - } - if a.store == nil { - return fmt.Errorf("missing store") + if a.lister == nil { + return fmt.Errorf("missing lister") } return nil } -func (a *claimDefaulterPlugin) Run() { - if a.stopChan == nil { - a.stopChan = make(chan struct{}) - } - a.reflector.RunUntil(a.stopChan) -} -func (a *claimDefaulterPlugin) Stop() { - if a.stopChan != nil { - close(a.stopChan) - a.stopChan = nil - } -} - // Admit sets the default value of a PersistentVolumeClaim's storage class, in case the user did // not provide a value. // @@ -143,7 +103,7 @@ func (c *claimDefaulterPlugin) Admit(a admission.Attributes) error { glog.V(4).Infof("no storage class for claim %s (generate: %s)", pvc.Name, pvc.GenerateName) - def, err := getDefaultClass(c.store) + def, err := getDefaultClass(c.lister) if err != nil { return admission.NewForbidden(a, err) } @@ -161,13 +121,14 @@ func (c *claimDefaulterPlugin) Admit(a admission.Attributes) error { } // getDefaultClass returns the default StorageClass from the store, or nil. -func getDefaultClass(store cache.Store) (*storage.StorageClass, error) { +func getDefaultClass(lister storagelisters.StorageClassLister) (*storage.StorageClass, error) { + list, err := lister.List(labels.Everything()) + if err != nil { + return nil, err + } + defaultClasses := []*storage.StorageClass{} - for _, c := range store.List() { - class, ok := c.(*storage.StorageClass) - if !ok { - return nil, errors.NewInternalError(fmt.Errorf("error converting stored object to StorageClass: %v", c)) - } + for _, class := range list { if storageutil.IsDefaultAnnotation(class.ObjectMeta) { defaultClasses = append(defaultClasses, class) glog.V(4).Infof("getDefaultClass added: %s", class.Name) diff --git a/plugin/pkg/admission/storageclass/default/admission_test.go b/plugin/pkg/admission/storageclass/default/admission_test.go index ff953a17716..41d32eb3ffd 100644 --- a/plugin/pkg/admission/storageclass/default/admission_test.go +++ b/plugin/pkg/admission/storageclass/default/admission_test.go @@ -26,6 +26,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/storage" storageutil "k8s.io/kubernetes/pkg/apis/storage/util" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + "k8s.io/kubernetes/pkg/controller" ) func TestAdmission(t *testing.T) { @@ -193,9 +195,10 @@ func TestAdmission(t *testing.T) { claim := clone.(*api.PersistentVolumeClaim) ctrl := newPlugin() - ctrl.SetInternalClientSet(nil) + informerFactory := informers.NewSharedInformerFactory(nil, controller.NoResyncPeriodFunc()) + ctrl.SetInformerFactory(informerFactory) for _, c := range test.classes { - ctrl.store.Add(c) + informerFactory.Storage().InternalVersion().StorageClasses().Informer().GetStore().Add(c) } attrs := admission.NewAttributesRecord( claim, // new object diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 0694dbf0bfc..7f298b51d58 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -310,7 +310,12 @@ func ClusterRoles() []rbac.ClusterRole { "services", "serviceaccounts", ).RuleOrDie(), - rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(), + rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources( + "daemonsets", + "deployments", + "podsecuritypolicies", + "replicasets", + ).RuleOrDie(), rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index ca2d48125ba..b2a7c46e24a 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -489,6 +489,7 @@ items: resources: - daemonsets - deployments + - podsecuritypolicies - replicasets verbs: - list diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index 942bb815258..3144baec0bb 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -39,6 +39,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/controller" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" @@ -73,6 +74,8 @@ func TestQuota(t *testing.T) { t.Fatalf("unexpected error: %v", err) } admission.(kubeadmission.WantsInternalClientSet).SetInternalClientSet(internalClientset) + internalInformers := internalinformers.NewSharedInformerFactory(internalClientset, controller.NoResyncPeriodFunc()) + admission.(kubeadmission.WantsInformerFactory).SetInformerFactory(internalInformers) defer close(admissionCh) masterConfig := framework.NewIntegrationTestMasterConfig() @@ -113,6 +116,7 @@ func TestQuota(t *testing.T) { ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(informers), } go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, controllerCh) + internalInformers.Start(controllerCh) informers.Start(controllerCh) startTime := time.Now() @@ -257,6 +261,8 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { t.Fatalf("unexpected error: %v", err) } admission.(kubeadmission.WantsInternalClientSet).SetInternalClientSet(internalClientset) + internalInformers := internalinformers.NewSharedInformerFactory(internalClientset, controller.NoResyncPeriodFunc()) + admission.(kubeadmission.WantsInformerFactory).SetInformerFactory(internalInformers) defer close(admissionCh) masterConfig := framework.NewIntegrationTestMasterConfig() @@ -295,6 +301,7 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(informers), } go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, controllerCh) + internalInformers.Start(controllerCh) informers.Start(controllerCh) // try to create a pod diff --git a/test/integration/serviceaccount/service_account_test.go b/test/integration/serviceaccount/service_account_test.go index 1db4587a722..5e6633ff5af 100644 --- a/test/integration/serviceaccount/service_account_test.go +++ b/test/integration/serviceaccount/service_account_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/controller" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount" @@ -407,6 +408,8 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie // Set up admission plugin to auto-assign serviceaccounts to pods serviceAccountAdmission := serviceaccountadmission.NewServiceAccount() serviceAccountAdmission.SetInternalClientSet(internalRootClientset) + internalInformers := internalinformers.NewSharedInformerFactory(internalRootClientset, controller.NoResyncPeriodFunc()) + serviceAccountAdmission.SetInformerFactory(internalInformers) masterConfig := framework.NewMasterConfig() masterConfig.GenericConfig.EnableIndex = true @@ -428,13 +431,11 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), ) informers.Start(stopCh) + internalInformers.Start(stopCh) go serviceAccountController.Run(5, stopCh) - // Start the admission plugin reflectors - serviceAccountAdmission.Run() stop := func() { close(stopCh) - serviceAccountAdmission.Stop() apiServer.Close() }