mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
Merge pull request #28504 from deads2k/allow-quota-injection
Automatic merge from submit-queue allow lock acquisition injection for quota admission Allows for custom lock acquisition when composing the quota admission controller. @derekwaynecarr I'm still experimenting to make sure this satisfies the need downstream, but looking for agreement in principle
This commit is contained in:
commit
9a4cc9979a
@ -92,6 +92,9 @@ type sharedIndexInformer struct {
|
|||||||
// blockDeltas gives a way to stop all event distribution so that a late event handler
|
// blockDeltas gives a way to stop all event distribution so that a late event handler
|
||||||
// can safely join the shared informer.
|
// can safely join the shared informer.
|
||||||
blockDeltas sync.Mutex
|
blockDeltas sync.Mutex
|
||||||
|
// stopCh is the channel used to stop the main Run process. We have to track it so that
|
||||||
|
// late joiners can have a proper stop
|
||||||
|
stopCh <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
||||||
@ -146,6 +149,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
|||||||
s.started = true
|
s.started = true
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
s.stopCh = stopCh
|
||||||
s.processor.run(stopCh)
|
s.processor.run(stopCh)
|
||||||
s.controller.Run(stopCh)
|
s.controller.Run(stopCh)
|
||||||
}
|
}
|
||||||
@ -220,6 +224,9 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro
|
|||||||
listener := newProcessListener(handler)
|
listener := newProcessListener(handler)
|
||||||
s.processor.listeners = append(s.processor.listeners, listener)
|
s.processor.listeners = append(s.processor.listeners, listener)
|
||||||
|
|
||||||
|
go listener.run(s.stopCh)
|
||||||
|
go listener.pop(s.stopCh)
|
||||||
|
|
||||||
items := s.indexer.List()
|
items := s.indexer.List()
|
||||||
for i := range items {
|
for i := range items {
|
||||||
listener.add(addNotification{newObj: items[i]})
|
listener.add(addNotification{newObj: items[i]})
|
||||||
|
@ -59,7 +59,7 @@ func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEv
|
|||||||
}
|
}
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
|
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, registry, numEvaluators, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, registry, nil, numEvaluators, stopCh)
|
||||||
|
|
||||||
return "aAdmission{
|
return "aAdmission{
|
||||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||||
|
@ -150,7 +150,7 @@ func TestAdmissionIgnoresSubresources(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
@ -194,7 +194,7 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
@ -277,7 +277,7 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||||
@ -367,7 +367,7 @@ func TestAdmitExceedQuotaLimit(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
@ -411,7 +411,7 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
@ -465,7 +465,7 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
|
|||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
quotaAccessor.liveLookupCache = liveLookupCache
|
quotaAccessor.liveLookupCache = liveLookupCache
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
@ -531,7 +531,7 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
@ -636,7 +636,7 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
@ -728,7 +728,7 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
handler := "aAdmission{
|
handler := "aAdmission{
|
||||||
@ -846,7 +846,7 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) {
|
|||||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||||
quotaAccessor.indexer = indexer
|
quotaAccessor.indexer = indexer
|
||||||
go quotaAccessor.Run(stopCh)
|
go quotaAccessor.Run(stopCh)
|
||||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh)
|
||||||
evaluator.(*quotaEvaluator).registry = registry
|
evaluator.(*quotaEvaluator).registry = registry
|
||||||
|
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
@ -44,6 +44,8 @@ type Evaluator interface {
|
|||||||
|
|
||||||
type quotaEvaluator struct {
|
type quotaEvaluator struct {
|
||||||
quotaAccessor QuotaAccessor
|
quotaAccessor QuotaAccessor
|
||||||
|
// lockAquisitionFunc acquires any required locks and returns a cleanup method to defer
|
||||||
|
lockAquisitionFunc func([]api.ResourceQuota) func()
|
||||||
|
|
||||||
// registry that knows how to measure usage for objects
|
// registry that knows how to measure usage for objects
|
||||||
registry quota.Registry
|
registry quota.Registry
|
||||||
@ -96,9 +98,10 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter {
|
|||||||
// NewQuotaEvaluator configures an admission controller that can enforce quota constraints
|
// NewQuotaEvaluator configures an admission controller that can enforce quota constraints
|
||||||
// using the provided registry. The registry must have the capability to handle group/kinds that
|
// using the provided registry. The registry must have the capability to handle group/kinds that
|
||||||
// are persisted by the server this admission controller is intercepting
|
// are persisted by the server this admission controller is intercepting
|
||||||
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, workers int, stopCh <-chan struct{}) Evaluator {
|
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAquisitionFunc func([]api.ResourceQuota) func(), workers int, stopCh <-chan struct{}) Evaluator {
|
||||||
return "aEvaluator{
|
return "aEvaluator{
|
||||||
quotaAccessor: quotaAccessor,
|
quotaAccessor: quotaAccessor,
|
||||||
|
lockAquisitionFunc: lockAquisitionFunc,
|
||||||
|
|
||||||
registry: registry,
|
registry: registry,
|
||||||
|
|
||||||
@ -169,6 +172,11 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if e.lockAquisitionFunc != nil {
|
||||||
|
releaseLocks := e.lockAquisitionFunc(quotas)
|
||||||
|
defer releaseLocks()
|
||||||
|
}
|
||||||
|
|
||||||
e.checkQuotas(quotas, admissionAttributes, 3)
|
e.checkQuotas(quotas, admissionAttributes, 3)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user