Fix ResourceQuota admission shutdown

This commit is contained in:
Wojciech Tyczyński 2022-05-18 19:30:23 +02:00
parent fbb5717279
commit f8211d7e44
15 changed files with 143 additions and 41 deletions

View File

@ -115,7 +115,7 @@ func newGCPermissionsEnforcement() (*gcPermissionsEnforcement, error) {
whiteList: whiteList,
}
genericPluginInitializer := initializer.New(nil, nil, fakeAuthorizer{}, nil)
genericPluginInitializer := initializer.New(nil, nil, fakeAuthorizer{}, nil, nil)
fakeDiscoveryClient := &fakediscovery.FakeDiscovery{Fake: &coretesting.Fake{}}
fakeDiscoveryClient.Resources = []*metav1.APIResourceList{
{

View File

@ -790,7 +790,7 @@ func newHandlerForTest(c clientset.Interface) (*LimitRanger, informers.SharedInf
if err != nil {
return nil, f, err
}
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.ValidateInitialization(handler)
return handler, f, err

View File

@ -41,7 +41,7 @@ import (
func newHandlerForTest(c clientset.Interface) (admission.MutationInterface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewProvision()
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.ValidateInitialization(handler)
return handler, f, err

View File

@ -39,7 +39,7 @@ import (
func newHandlerForTest(c kubernetes.Interface) (admission.ValidationInterface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewExists()
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.ValidateInitialization(handler)
return handler, f, err

View File

@ -198,7 +198,7 @@ func TestHandles(t *testing.T) {
func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewPodNodeSelector(nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err := admission.ValidateInitialization(handler)
return handler, f, err

View File

@ -356,7 +356,7 @@ func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInforme
return nil, nil, err
}
handler := NewPodTolerationsPlugin(pluginConfig)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil)
pluginInitializer := genericadmissioninitializer.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.ValidateInitialization(handler)
return handler, f, err

View File

@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
resourcequotaapi "k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota"
"k8s.io/client-go/informers"
@ -36,6 +37,7 @@ import (
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
"k8s.io/kubernetes/pkg/quota/v1/install"
)
@ -99,15 +101,18 @@ func createHandlerWithConfig(kubeClient kubernetes.Interface, informerFactory in
}
quotaConfiguration := install.NewQuotaConfigurationForAdmission()
handler, err := resourcequota.NewResourceQuota(config, 5, stopCh)
handler, err := resourcequota.NewResourceQuota(config, 5)
if err != nil {
return nil, err
}
handler.SetExternalKubeClientSet(kubeClient)
handler.SetExternalKubeInformerFactory(informerFactory)
handler.SetQuotaConfiguration(quotaConfiguration)
return handler, nil
initializers := admission.PluginInitializers{
genericadmissioninitializer.New(kubeClient, informerFactory, nil, nil, stopCh),
kubeapiserveradmission.NewPluginInitializer(nil, nil, quotaConfiguration),
}
initializers.Initialize(handler)
return handler, admission.ValidateInitialization(handler)
}
// TestAdmissionIgnoresDelete verifies that the admission controller ignores delete operations

View File

@ -29,6 +29,7 @@ type pluginInitializer struct {
externalInformers informers.SharedInformerFactory
authorizer authorizer.Authorizer
featureGates featuregate.FeatureGate
stopCh <-chan struct{}
}
// New creates an instance of admission plugins initializer.
@ -39,19 +40,26 @@ func New(
extInformers informers.SharedInformerFactory,
authz authorizer.Authorizer,
featureGates featuregate.FeatureGate,
stopCh <-chan struct{},
) pluginInitializer {
return pluginInitializer{
externalClient: extClientset,
externalInformers: extInformers,
authorizer: authz,
featureGates: featureGates,
stopCh: stopCh,
}
}
// Initialize checks the initialization interfaces implemented by a plugin
// and provide the appropriate initialization data
func (i pluginInitializer) Initialize(plugin admission.Interface) {
// First tell the plugin about enabled features, so it can decide whether to start informers or not
// First tell the plugin about drained notification, so it can pass it to further initializations.
if wants, ok := plugin.(WantsDrainedNotification); ok {
wants.SetDrainedNotification(i.stopCh)
}
// Second tell the plugin about enabled features, so it can decide whether to start informers or not
if wants, ok := plugin.(WantsFeatures); ok {
wants.InspectFeatureGates(i.featureGates)
}

View File

@ -32,7 +32,7 @@ import (
// TestWantsAuthorizer ensures that the authorizer is injected
// when the WantsAuthorizer interface is implemented by a plugin.
func TestWantsAuthorizer(t *testing.T) {
target := initializer.New(nil, nil, &TestAuthorizer{}, nil)
target := initializer.New(nil, nil, &TestAuthorizer{}, nil, nil)
wantAuthorizerAdmission := &WantAuthorizerAdmission{}
target.Initialize(wantAuthorizerAdmission)
if wantAuthorizerAdmission.auth == nil {
@ -44,7 +44,7 @@ func TestWantsAuthorizer(t *testing.T) {
// when the WantsExternalKubeClientSet interface is implemented by a plugin.
func TestWantsExternalKubeClientSet(t *testing.T) {
cs := &fake.Clientset{}
target := initializer.New(cs, nil, &TestAuthorizer{}, nil)
target := initializer.New(cs, nil, &TestAuthorizer{}, nil, nil)
wantExternalKubeClientSet := &WantExternalKubeClientSet{}
target.Initialize(wantExternalKubeClientSet)
if wantExternalKubeClientSet.cs != cs {
@ -57,7 +57,7 @@ func TestWantsExternalKubeClientSet(t *testing.T) {
func TestWantsExternalKubeInformerFactory(t *testing.T) {
cs := &fake.Clientset{}
sf := informers.NewSharedInformerFactory(cs, time.Duration(1)*time.Second)
target := initializer.New(cs, sf, &TestAuthorizer{}, nil)
target := initializer.New(cs, sf, &TestAuthorizer{}, nil, nil)
wantExternalKubeInformerFactory := &WantExternalKubeInformerFactory{}
target.Initialize(wantExternalKubeInformerFactory)
if wantExternalKubeInformerFactory.sf != sf {
@ -65,6 +65,18 @@ func TestWantsExternalKubeInformerFactory(t *testing.T) {
}
}
// TestWantsShutdownSignal ensures that the shutdown signal is injected
// when the WantsShutdownSignal interface is implemented by a plugin.
func TestWantsShutdownNotification(t *testing.T) {
stopCh := make(chan struct{})
target := initializer.New(nil, nil, &TestAuthorizer{}, nil, stopCh)
wantDrainedNotification := &WantDrainedNotification{}
target.Initialize(wantDrainedNotification)
if wantDrainedNotification.stopCh == nil {
t.Errorf("expected stopCh to be initialized but found nil")
}
}
// WantExternalKubeInformerFactory is a test stub that fulfills the WantsExternalKubeInformerFactory interface
type WantExternalKubeInformerFactory struct {
sf informers.SharedInformerFactory
@ -114,6 +126,23 @@ func (self *WantAuthorizerAdmission) ValidateInitialization() error { retur
var _ admission.Interface = &WantAuthorizerAdmission{}
var _ initializer.WantsAuthorizer = &WantAuthorizerAdmission{}
// WantDrainedNotification is a test stub that filfills the WantsDrainedNotification interface.
type WantDrainedNotification struct {
stopCh <-chan struct{}
}
func (self *WantDrainedNotification) SetDrainedNotification(stopCh <-chan struct{}) {
self.stopCh = stopCh
}
func (self *WantDrainedNotification) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
return nil
}
func (self *WantDrainedNotification) Handles(o admission.Operation) bool { return false }
func (self *WantDrainedNotification) ValidateInitialization() error { return nil }
var _ admission.Interface = &WantDrainedNotification{}
var _ initializer.WantsDrainedNotification = &WantDrainedNotification{}
// TestAuthorizer is a test stub that fulfills the WantsAuthorizer interface.
type TestAuthorizer struct{}

View File

@ -49,6 +49,14 @@ type WantsQuotaConfiguration interface {
admission.InitializationValidator
}
// WantsDrainedNotification defines a function which sets the notification of where the apiserver
// has already been drained for admission plugins that need it.
// After receiving that notification, Admit/Validate calls won't be called anymore.
type WantsDrainedNotification interface {
SetDrainedNotification(<-chan struct{})
admission.InitializationValidator
}
// WantsFeatureGate defines a function which passes the featureGates for inspection by an admission plugin.
// Admission plugins should not hold a reference to the featureGates. Instead, they should query a particular one
// and assign it to a simple bool in the admission plugin struct.

View File

@ -53,7 +53,7 @@ func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (
if err != nil {
return nil, f, err
}
pluginInitializer := kubeadmission.New(c, f, nil, nil)
pluginInitializer := kubeadmission.New(c, f, nil, nil, nil)
pluginInitializer.Initialize(handler)
err = admission.ValidateInitialization(handler)
return handler, f, err

View File

@ -37,7 +37,10 @@ import (
// PluginName is a string with the name of the plugin
const PluginName = "ResourceQuota"
var namespaceGVK = v1.SchemeGroupVersion.WithKind("Namespace").GroupKind()
var (
namespaceGVK = v1.SchemeGroupVersion.WithKind("Namespace").GroupKind()
stopChUnconfiguredErr = fmt.Errorf("quota configuration configured between stop channel")
)
// Register registers a plugin
func Register(plugins *admission.Plugins) {
@ -54,7 +57,7 @@ func Register(plugins *admission.Plugins) {
return nil, errs.ToAggregate()
}
}
return NewResourceQuota(configuration, 5, make(chan struct{}))
return NewResourceQuota(configuration, 5)
})
}
@ -67,12 +70,14 @@ type QuotaAdmission struct {
numEvaluators int
quotaAccessor *quotaAccessor
evaluator Evaluator
initializationErr error
}
var _ admission.ValidationInterface = &QuotaAdmission{}
var _ = genericadmissioninitializer.WantsExternalKubeInformerFactory(&QuotaAdmission{})
var _ = genericadmissioninitializer.WantsExternalKubeClientSet(&QuotaAdmission{})
var _ = genericadmissioninitializer.WantsQuotaConfiguration(&QuotaAdmission{})
var _ = genericadmissioninitializer.WantsDrainedNotification(&QuotaAdmission{})
type liveLookupEntry struct {
expiry time.Time
@ -82,7 +87,7 @@ type liveLookupEntry struct {
// NewResourceQuota configures an admission controller that can enforce quota constraints
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func NewResourceQuota(config *resourcequotaapi.Configuration, numEvaluators int, stopCh <-chan struct{}) (*QuotaAdmission, error) {
func NewResourceQuota(config *resourcequotaapi.Configuration, numEvaluators int) (*QuotaAdmission, error) {
quotaAccessor, err := newQuotaAccessor()
if err != nil {
return nil, err
@ -90,13 +95,18 @@ func NewResourceQuota(config *resourcequotaapi.Configuration, numEvaluators int,
return &QuotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
stopCh: stopCh,
stopCh: nil,
numEvaluators: numEvaluators,
config: config,
quotaAccessor: quotaAccessor,
}, nil
}
// SetDrainedNotification sets the stop channel into QuotaAdmission.
func (a *QuotaAdmission) SetDrainedNotification(stopCh <-chan struct{}) {
a.stopCh = stopCh
}
// SetExternalKubeClientSet registers the client into QuotaAdmission
func (a *QuotaAdmission) SetExternalKubeClientSet(client kubernetes.Interface) {
a.quotaAccessor.client = client
@ -110,11 +120,21 @@ func (a *QuotaAdmission) SetExternalKubeInformerFactory(f informers.SharedInform
// SetQuotaConfiguration assigns and initializes configuration and evaluator for QuotaAdmission
func (a *QuotaAdmission) SetQuotaConfiguration(c quota.Configuration) {
a.quotaConfiguration = c
if a.stopCh == nil {
a.initializationErr = stopChUnconfiguredErr
return
}
a.evaluator = NewQuotaEvaluator(a.quotaAccessor, a.quotaConfiguration.IgnoredResources(), generic.NewRegistry(a.quotaConfiguration.Evaluators()), nil, a.config, a.numEvaluators, a.stopCh)
}
// ValidateInitialization ensures an authorizer is set.
func (a *QuotaAdmission) ValidateInitialization() error {
if a.initializationErr != nil {
return a.initializationErr
}
if a.stopCh == nil {
return fmt.Errorf("missing stopCh")
}
if a.quotaAccessor == nil {
return fmt.Errorf("missing quotaAccessor")
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
v1 "k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota/v1"
"k8s.io/apiserver/pkg/quota/v1/generic"
)
func TestPrettyPrint(t *testing.T) {
@ -161,3 +162,14 @@ func TestExcludedOperations(t *testing.T) {
}
}
}
func TestInitializationOrder(t *testing.T) {
a := &QuotaAdmission{}
qca := generic.NewConfiguration(nil, nil)
a.SetQuotaConfiguration(qca)
if err := a.ValidateInitialization(); err != stopChUnconfiguredErr {
t.Errorf("unexpected error: %v", err)
}
}

View File

@ -143,9 +143,8 @@ func (a *AdmissionOptions) ApplyTo(
if err != nil {
return err
}
genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)
initializersChain := admission.PluginInitializers{}
pluginInitializers = append(pluginInitializers, genericInitializer)
genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features, c.DrainedNotify())
initializersChain := admission.PluginInitializers{genericInitializer}
initializersChain = append(initializersChain, pluginInitializers...)
admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)

View File

@ -35,6 +35,8 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/admission"
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
resourcequotaapi "k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota"
"k8s.io/apiserver/pkg/quota/v1/generic"
@ -45,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
"k8s.io/kubernetes/test/integration/framework"
)
@ -68,21 +71,27 @@ func TestQuota(t *testing.T) {
}))
admissionCh := make(chan struct{})
defer close(admissionCh)
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
config := &resourcequotaapi.Configuration{}
admission, err := resourcequota.NewResourceQuota(config, 5, admissionCh)
admissionControl, err := resourcequota.NewResourceQuota(config, 5)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
admission.SetExternalKubeClientSet(clientset)
internalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
admission.SetExternalKubeInformerFactory(internalInformers)
qca := quotainstall.NewQuotaConfigurationForAdmission()
admission.SetQuotaConfiguration(qca)
defer close(admissionCh)
initializers := admission.PluginInitializers{
genericadmissioninitializer.New(clientset, internalInformers, nil, nil, admissionCh),
kubeapiserveradmission.NewPluginInitializer(nil, nil, qca),
}
initializers.Initialize(admissionControl)
if err := admission.ValidateInitialization(admissionControl); err != nil {
t.Fatalf("couldn't initialize resource quota: %v", err)
}
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.AdmissionControl = admission
controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl
_, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h)
defer closeFn()
@ -291,6 +300,7 @@ func TestQuotaLimitedResourceDenial(t *testing.T) {
}))
admissionCh := make(chan struct{})
defer close(admissionCh)
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
// stop creation of a pod resource unless there is a quota
@ -303,18 +313,23 @@ func TestQuotaLimitedResourceDenial(t *testing.T) {
},
}
qca := quotainstall.NewQuotaConfigurationForAdmission()
admission, err := resourcequota.NewResourceQuota(config, 5, admissionCh)
admissionControl, err := resourcequota.NewResourceQuota(config, 5)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
admission.SetExternalKubeClientSet(clientset)
externalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
admission.SetExternalKubeInformerFactory(externalInformers)
admission.SetQuotaConfiguration(qca)
defer close(admissionCh)
initializers := admission.PluginInitializers{
genericadmissioninitializer.New(clientset, externalInformers, nil, nil, admissionCh),
kubeapiserveradmission.NewPluginInitializer(nil, nil, qca),
}
initializers.Initialize(admissionControl)
if err := admission.ValidateInitialization(admissionControl); err != nil {
t.Fatalf("couldn't initialize resource quota: %v", err)
}
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.AdmissionControl = admission
controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl
_, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h)
defer closeFn()
@ -419,6 +434,7 @@ func TestQuotaLimitService(t *testing.T) {
}))
admissionCh := make(chan struct{})
defer close(admissionCh)
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
// stop creation of a pod resource unless there is a quota
@ -431,18 +447,23 @@ func TestQuotaLimitService(t *testing.T) {
},
}
qca := quotainstall.NewQuotaConfigurationForAdmission()
admission, err := resourcequota.NewResourceQuota(config, 5, admissionCh)
admissionControl, err := resourcequota.NewResourceQuota(config, 5)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
admission.SetExternalKubeClientSet(clientset)
externalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
admission.SetExternalKubeInformerFactory(externalInformers)
admission.SetQuotaConfiguration(qca)
defer close(admissionCh)
initializers := admission.PluginInitializers{
genericadmissioninitializer.New(clientset, externalInformers, nil, nil, admissionCh),
kubeapiserveradmission.NewPluginInitializer(nil, nil, qca),
}
initializers.Initialize(admissionControl)
if err := admission.ValidateInitialization(admissionControl); err != nil {
t.Fatalf("couldn't initialize resource quota: %v", err)
}
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.AdmissionControl = admission
controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl
_, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h)
defer closeFn()