Improve quota integration test to not use events, reduce number of pods provisioned

This commit is contained in:
derekwaynecarr
2016-05-10 16:00:24 -04:00
parent 31970780cc
commit fc3e71894d
4 changed files with 90 additions and 40 deletions

View File

@@ -28,13 +28,15 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/install"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)
func init() {
admission.RegisterPlugin("ResourceQuota",
func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
registry := install.NewRegistry(client)
return NewResourceQuota(client, registry, 5)
// TODO: expose a stop channel in admission factory
return NewResourceQuota(client, registry, 5, make(chan struct{}))
})
}
@@ -53,12 +55,14 @@ type liveLookupEntry struct {
// NewResourceQuota configures an admission controller that can enforce quota constraints
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int) (admission.Interface, error) {
func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) {
evaluator, err := newQuotaEvaluator(client, registry)
if err != nil {
return nil, err
}
evaluator.Run(numEvaluators)
defer utilruntime.HandleCrash()
go evaluator.Run(numEvaluators, stopCh)
return &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),

View File

@@ -22,7 +22,7 @@ import (
"testing"
"time"
"github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
@@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/quota/generic"
"k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
)
@@ -115,7 +116,9 @@ func TestPrettyPrint(t *testing.T) {
// TestAdmissionIgnoresDelete verifies that the admission controller ignores delete operations
func TestAdmissionIgnoresDelete(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient), 5)
stopCh := make(chan struct{})
defer close(stopCh)
handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient), 5, stopCh)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
@@ -143,7 +146,10 @@ func TestAdmissionIgnoresSubresources(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@@ -181,7 +187,10 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@@ -255,7 +264,10 @@ func TestAdmitExceedQuotaLimit(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@@ -293,7 +305,10 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@@ -334,7 +349,10 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.liveLookupCache = liveLookupCache
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@@ -394,7 +412,10 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@@ -493,7 +514,10 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@@ -579,7 +603,10 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,
@@ -692,7 +719,10 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) {
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
evaluator.indexer = indexer
evaluator.registry = registry
evaluator.Run(5)
stopCh := make(chan struct{})
defer close(stopCh)
defer utilruntime.HandleCrash()
go evaluator.Run(5, stopCh)
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
evaluator: evaluator,

View File

@@ -21,7 +21,8 @@ import (
"sync"
"time"
"github.com/hashicorp/golang-lru"
"github.com/golang/glog"
lru "github.com/hashicorp/golang-lru"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@@ -126,25 +127,35 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
}
// Run begins watching and syncing.
func (e *quotaEvaluator) Run(workers int) {
func (e *quotaEvaluator) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for i := 0; i < workers; i++ {
go wait.Until(e.doWork, time.Second, make(chan struct{}))
go wait.Until(e.doWork, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down quota evaluator")
e.queue.ShutDown()
}
func (e *quotaEvaluator) doWork() {
workFunc := func() bool {
ns, admissionAttributes, quit := e.getWork()
if quit {
return true
}
defer e.completeWork(ns)
if len(admissionAttributes) == 0 {
return false
}
e.checkAttributes(ns, admissionAttributes)
return false
}
for {
func() {
ns, admissionAttributes := e.getWork()
defer e.completeWork(ns)
if len(admissionAttributes) == 0 {
return
}
e.checkAttributes(ns, admissionAttributes)
}()
if quit := workFunc(); quit {
glog.Infof("quota evaluator worker shutdown")
return
}
}
}
@@ -434,8 +445,11 @@ func (e *quotaEvaluator) completeWork(ns string) {
e.inProgress.Delete(ns)
}
func (e *quotaEvaluator) getWork() (string, []*admissionWaiter) {
uncastNS, _ := e.queue.Get()
func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
uncastNS, shutdown := e.queue.Get()
if shutdown {
return "", []*admissionWaiter{}, shutdown
}
ns := uncastNS.(string)
e.workLock.Lock()
@@ -450,12 +464,12 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter) {
if len(work) != 0 {
e.inProgress.Insert(ns)
return ns, work
return ns, work, false
}
e.queue.Done(ns)
e.inProgress.Delete(ns)
return ns, []*admissionWaiter{}
return ns, []*admissionWaiter{}, false
}
func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) {