Merge pull request #34841 from derekwaynecarr/quota-shared-informer

Automatic merge from submit-queue

quota controller uses informers if available for pod calculation

This PR does the following:
1. plumb informer factory into quota registry and evaluators
2. pod quota evaluator uses informers for determining aggregrate usage instead of making direct calls
3. admission code path does not use informers because
   1. we do not want to add new watches in apiserver
   2. admission code path does not require aggregate usage calculation

As a result, quota controller is much faster in re-calculating quota usage when it observes a pod deletion.

Follow-on PRs will make similar changes for other informer backed resources (pvcs next).

/cc @deads2k @mfojtik @smarterclayton @kubernetes/rh-cluster-infra
This commit is contained in:
Kubernetes Submit Queue 2016-10-31 14:34:57 -07:00 committed by GitHub
commit cbabb03acc
20 changed files with 156 additions and 72 deletions

View File

@ -323,7 +323,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
} }
resourceQuotaControllerClient := client("resourcequota-controller") resourceQuotaControllerClient := client("resourcequota-controller")
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient) resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, sharedInformers)
groupKindsToReplenish := []unversioned.GroupKind{ groupKindsToReplenish := []unversioned.GroupKind{
api.Kind("Pod"), api.Kind("Pod"),
api.Kind("Service"), api.Kind("Service"),
@ -336,7 +336,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl
KubeClient: resourceQuotaControllerClient, KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry, Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers.Pods().Informer(), resourceQuotaControllerClient), ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers, resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s), ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish, GroupKindsToReplenish: groupKindsToReplenish,
} }

View File

@ -95,18 +95,20 @@ type ReplenishmentControllerFactory interface {
// replenishmentControllerFactory implements ReplenishmentControllerFactory // replenishmentControllerFactory implements ReplenishmentControllerFactory
type replenishmentControllerFactory struct { type replenishmentControllerFactory struct {
kubeClient clientset.Interface kubeClient clientset.Interface
podInformer cache.SharedInformer sharedInformerFactory informers.SharedInformerFactory
} }
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers // NewReplenishmentControllerFactory returns a factory that knows how to build controllers
// to replenish resources when updated or deleted // to replenish resources when updated or deleted
func NewReplenishmentControllerFactory(podInformer cache.SharedInformer, kubeClient clientset.Interface) ReplenishmentControllerFactory { func NewReplenishmentControllerFactory(f informers.SharedInformerFactory, kubeClient clientset.Interface) ReplenishmentControllerFactory {
return &replenishmentControllerFactory{ return &replenishmentControllerFactory{
kubeClient: kubeClient, kubeClient: kubeClient,
podInformer: podInformer, sharedInformerFactory: f,
} }
} }
// NewReplenishmentControllerFactoryFromClient returns a factory that knows how to build controllers to replenish resources
// when updated or deleted using the specified client.
func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory { func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory {
return NewReplenishmentControllerFactory(nil, kubeClient) return NewReplenishmentControllerFactory(nil, kubeClient)
} }
@ -119,18 +121,16 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon
switch options.GroupKind { switch options.GroupKind {
case api.Kind("Pod"): case api.Kind("Pod"):
if r.podInformer != nil { if r.sharedInformerFactory != nil {
r.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ podInformer := r.sharedInformerFactory.Pods().Informer()
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: PodReplenishmentUpdateFunc(options), UpdateFunc: PodReplenishmentUpdateFunc(options),
DeleteFunc: ObjectReplenishmentDeleteFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options),
}) })
result = r.podInformer.GetController() result = podInformer.GetController()
break break
} }
result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
r.podInformer = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
result = r.podInformer
case api.Kind("Service"): case api.Kind("Service"):
_, result = cache.NewInformer( _, result = cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{

View File

@ -107,7 +107,7 @@ func TestSyncResourceQuota(t *testing.T) {
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient, KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc, ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient), Registry: install.NewRegistry(kubeClient, nil),
GroupKindsToReplenish: []unversioned.GroupKind{ GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"), api.Kind("Pod"),
api.Kind("Service"), api.Kind("Service"),
@ -192,7 +192,7 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) {
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient, KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc, ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient), Registry: install.NewRegistry(kubeClient, nil),
GroupKindsToReplenish: []unversioned.GroupKind{ GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"), api.Kind("Pod"),
api.Kind("Service"), api.Kind("Service"),
@ -280,7 +280,7 @@ func TestSyncResourceQuotaSpecHardChange(t *testing.T) {
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient, KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc, ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient), Registry: install.NewRegistry(kubeClient, nil),
GroupKindsToReplenish: []unversioned.GroupKind{ GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"), api.Kind("Pod"),
api.Kind("Service"), api.Kind("Service"),
@ -368,7 +368,7 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient, KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc, ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient), Registry: install.NewRegistry(kubeClient, nil),
GroupKindsToReplenish: []unversioned.GroupKind{ GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"), api.Kind("Pod"),
api.Kind("Service"), api.Kind("Service"),
@ -400,7 +400,7 @@ func TestAddQuota(t *testing.T) {
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{ resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient, KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc, ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient), Registry: install.NewRegistry(kubeClient, nil),
GroupKindsToReplenish: []unversioned.GroupKind{ GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"), api.Kind("Pod"),
api.Kind("ReplicationController"), api.Kind("ReplicationController"),

View File

@ -31,6 +31,7 @@ go_library(
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/api/validation:go_default_library", "//pkg/api/validation:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/qos:go_default_library",
"//pkg/quota:go_default_library", "//pkg/quota:go_default_library",
"//pkg/quota/generic:go_default_library", "//pkg/quota/generic:go_default_library",

View File

@ -38,8 +38,16 @@ func NewConfigMapEvaluator(kubeClient clientset.Interface) quota.Evaluator {
MatchesScopeFunc: generic.MatchesNoScopeFunc, MatchesScopeFunc: generic.MatchesNoScopeFunc,
ConstraintsFunc: generic.ObjectCountConstraintsFunc(api.ResourceConfigMaps), ConstraintsFunc: generic.ObjectCountConstraintsFunc(api.ResourceConfigMaps),
UsageFunc: generic.ObjectCountUsageFunc(api.ResourceConfigMaps), UsageFunc: generic.ObjectCountUsageFunc(api.ResourceConfigMaps),
ListFuncByNamespace: func(namespace string, options api.ListOptions) (runtime.Object, error) { ListFuncByNamespace: func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
return kubeClient.Core().ConfigMaps(namespace).List(options) itemList, err := kubeClient.Core().ConfigMaps(namespace).List(options)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
}
return results, nil
}, },
} }
} }

View File

@ -43,8 +43,16 @@ func NewPersistentVolumeClaimEvaluator(kubeClient clientset.Interface) quota.Eva
MatchesScopeFunc: generic.MatchesNoScopeFunc, MatchesScopeFunc: generic.MatchesNoScopeFunc,
ConstraintsFunc: PersistentVolumeClaimConstraintsFunc, ConstraintsFunc: PersistentVolumeClaimConstraintsFunc,
UsageFunc: PersistentVolumeClaimUsageFunc, UsageFunc: PersistentVolumeClaimUsageFunc,
ListFuncByNamespace: func(namespace string, options api.ListOptions) (runtime.Object, error) { ListFuncByNamespace: func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
return kubeClient.Core().PersistentVolumeClaims(namespace).List(options) itemList, err := kubeClient.Core().PersistentVolumeClaims(namespace).List(options)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
}
return results, nil
}, },
} }
} }

View File

@ -23,8 +23,10 @@ import (
"k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/api/validation"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/generic" "k8s.io/kubernetes/pkg/quota/generic"
@ -33,8 +35,27 @@ import (
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
) )
// listPodsByNamespaceFuncUsingClient returns a pod listing function based on the provided client.
func listPodsByNamespaceFuncUsingClient(kubeClient clientset.Interface) generic.ListFuncByNamespace {
// TODO: ideally, we could pass dynamic client pool down into this code, and have one way of doing this.
// unfortunately, dynamic client works with Unstructured objects, and when we calculate Usage, we require
// structured objects.
return func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
itemList, err := kubeClient.Core().Pods(namespace).List(options)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
}
return results, nil
}
}
// NewPodEvaluator returns an evaluator that can evaluate pods // NewPodEvaluator returns an evaluator that can evaluate pods
func NewPodEvaluator(kubeClient clientset.Interface) quota.Evaluator { // if the specified shared informer factory is not nil, evaluator may use it to support listing functions.
func NewPodEvaluator(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Evaluator {
computeResources := []api.ResourceName{ computeResources := []api.ResourceName{
api.ResourceCPU, api.ResourceCPU,
api.ResourceMemory, api.ResourceMemory,
@ -44,6 +65,10 @@ func NewPodEvaluator(kubeClient clientset.Interface) quota.Evaluator {
api.ResourceLimitsMemory, api.ResourceLimitsMemory,
} }
allResources := append(computeResources, api.ResourcePods) allResources := append(computeResources, api.ResourcePods)
listFuncByNamespace := listPodsByNamespaceFuncUsingClient(kubeClient)
if f != nil {
listFuncByNamespace = generic.ListResourceUsingInformerFunc(f, unversioned.GroupResource{Resource: "pods"})
}
return &generic.GenericEvaluator{ return &generic.GenericEvaluator{
Name: "Evaluator.Pod", Name: "Evaluator.Pod",
InternalGroupKind: api.Kind("Pod"), InternalGroupKind: api.Kind("Pod"),
@ -59,9 +84,7 @@ func NewPodEvaluator(kubeClient clientset.Interface) quota.Evaluator {
MatchedResourceNames: allResources, MatchedResourceNames: allResources,
MatchesScopeFunc: PodMatchesScopeFunc, MatchesScopeFunc: PodMatchesScopeFunc,
UsageFunc: PodUsageFunc, UsageFunc: PodUsageFunc,
ListFuncByNamespace: func(namespace string, options api.ListOptions) (runtime.Object, error) { ListFuncByNamespace: listFuncByNamespace,
return kubeClient.Core().Pods(namespace).List(options)
},
} }
} }

View File

@ -99,7 +99,7 @@ func TestPodConstraintsFunc(t *testing.T) {
func TestPodEvaluatorUsage(t *testing.T) { func TestPodEvaluatorUsage(t *testing.T) {
kubeClient := fake.NewSimpleClientset() kubeClient := fake.NewSimpleClientset()
evaluator := NewPodEvaluator(kubeClient) evaluator := NewPodEvaluator(kubeClient, nil)
testCases := map[string]struct { testCases := map[string]struct {
pod *api.Pod pod *api.Pod
usage api.ResourceList usage api.ResourceList

View File

@ -19,13 +19,15 @@ package core
import ( import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/generic" "k8s.io/kubernetes/pkg/quota/generic"
) )
// NewRegistry returns a registry that knows how to deal with core kubernetes resources // NewRegistry returns a registry that knows how to deal with core kubernetes resources
func NewRegistry(kubeClient clientset.Interface) quota.Registry { // If an informer factory is provided, evaluators will use them.
pod := NewPodEvaluator(kubeClient) func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry {
pod := NewPodEvaluator(kubeClient, f)
service := NewServiceEvaluator(kubeClient) service := NewServiceEvaluator(kubeClient)
replicationController := NewReplicationControllerEvaluator(kubeClient) replicationController := NewReplicationControllerEvaluator(kubeClient)
resourceQuota := NewResourceQuotaEvaluator(kubeClient) resourceQuota := NewResourceQuotaEvaluator(kubeClient)

View File

@ -38,8 +38,16 @@ func NewReplicationControllerEvaluator(kubeClient clientset.Interface) quota.Eva
MatchesScopeFunc: generic.MatchesNoScopeFunc, MatchesScopeFunc: generic.MatchesNoScopeFunc,
ConstraintsFunc: generic.ObjectCountConstraintsFunc(api.ResourceReplicationControllers), ConstraintsFunc: generic.ObjectCountConstraintsFunc(api.ResourceReplicationControllers),
UsageFunc: generic.ObjectCountUsageFunc(api.ResourceReplicationControllers), UsageFunc: generic.ObjectCountUsageFunc(api.ResourceReplicationControllers),
ListFuncByNamespace: func(namespace string, options api.ListOptions) (runtime.Object, error) { ListFuncByNamespace: func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
return kubeClient.Core().ReplicationControllers(namespace).List(options) itemList, err := kubeClient.Core().ReplicationControllers(namespace).List(options)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
}
return results, nil
}, },
} }
} }

View File

@ -38,8 +38,16 @@ func NewResourceQuotaEvaluator(kubeClient clientset.Interface) quota.Evaluator {
MatchesScopeFunc: generic.MatchesNoScopeFunc, MatchesScopeFunc: generic.MatchesNoScopeFunc,
ConstraintsFunc: generic.ObjectCountConstraintsFunc(api.ResourceQuotas), ConstraintsFunc: generic.ObjectCountConstraintsFunc(api.ResourceQuotas),
UsageFunc: generic.ObjectCountUsageFunc(api.ResourceQuotas), UsageFunc: generic.ObjectCountUsageFunc(api.ResourceQuotas),
ListFuncByNamespace: func(namespace string, options api.ListOptions) (runtime.Object, error) { ListFuncByNamespace: func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
return kubeClient.Core().ResourceQuotas(namespace).List(options) itemList, err := kubeClient.Core().ResourceQuotas(namespace).List(options)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
}
return results, nil
}, },
} }
} }

View File

@ -38,8 +38,16 @@ func NewSecretEvaluator(kubeClient clientset.Interface) quota.Evaluator {
MatchesScopeFunc: generic.MatchesNoScopeFunc, MatchesScopeFunc: generic.MatchesNoScopeFunc,
ConstraintsFunc: generic.ObjectCountConstraintsFunc(api.ResourceSecrets), ConstraintsFunc: generic.ObjectCountConstraintsFunc(api.ResourceSecrets),
UsageFunc: generic.ObjectCountUsageFunc(api.ResourceSecrets), UsageFunc: generic.ObjectCountUsageFunc(api.ResourceSecrets),
ListFuncByNamespace: func(namespace string, options api.ListOptions) (runtime.Object, error) { ListFuncByNamespace: func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
return kubeClient.Core().Secrets(namespace).List(options) itemList, err := kubeClient.Core().Secrets(namespace).List(options)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
}
return results, nil
}, },
} }
} }

View File

@ -48,8 +48,16 @@ func NewServiceEvaluator(kubeClient clientset.Interface) quota.Evaluator {
MatchesScopeFunc: generic.MatchesNoScopeFunc, MatchesScopeFunc: generic.MatchesNoScopeFunc,
ConstraintsFunc: ServiceConstraintsFunc, ConstraintsFunc: ServiceConstraintsFunc,
UsageFunc: ServiceUsageFunc, UsageFunc: ServiceUsageFunc,
ListFuncByNamespace: func(namespace string, options api.ListOptions) (runtime.Object, error) { ListFuncByNamespace: func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
return kubeClient.Core().Services(namespace).List(options) itemList, err := kubeClient.Core().Services(namespace).List(options)
if err != nil {
return nil, err
}
results := make([]runtime.Object, 0, len(itemList.Items))
for i := range itemList.Items {
results = append(results, &itemList.Items[i])
}
return results, nil
}, },
} }
} }

View File

@ -20,9 +20,10 @@ go_library(
deps = [ deps = [
"//pkg/admission:go_default_library", "//pkg/admission:go_default_library",
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/api/meta:go_default_library",
"//pkg/api/resource:go_default_library", "//pkg/api/resource:go_default_library",
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/labels:go_default_library",
"//pkg/quota:go_default_library", "//pkg/quota:go_default_library",
"//pkg/runtime:go_default_library", "//pkg/runtime:go_default_library",
], ],

View File

@ -21,13 +21,25 @@ import (
"k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
) )
// ListResourceUsingInformerFunc returns a listing function based on the shared informer factory for the specified resource.
func ListResourceUsingInformerFunc(f informers.SharedInformerFactory, groupResource unversioned.GroupResource) ListFuncByNamespace {
return func(namespace string, options api.ListOptions) ([]runtime.Object, error) {
informer, err := f.ForResource(groupResource)
if err != nil {
return nil, err
}
return informer.Lister().ByNamespace(namespace).List(options.LabelSelector)
}
}
// ConstraintsFunc takes a list of required resources that must match on the input item // ConstraintsFunc takes a list of required resources that must match on the input item
type ConstraintsFunc func(required []api.ResourceName, item runtime.Object) error type ConstraintsFunc func(required []api.ResourceName, item runtime.Object) error
@ -35,7 +47,7 @@ type ConstraintsFunc func(required []api.ResourceName, item runtime.Object) erro
type GetFuncByNamespace func(namespace, name string) (runtime.Object, error) type GetFuncByNamespace func(namespace, name string) (runtime.Object, error)
// ListFuncByNamespace knows how to list resources in a namespace // ListFuncByNamespace knows how to list resources in a namespace
type ListFuncByNamespace func(namespace string, options api.ListOptions) (runtime.Object, error) type ListFuncByNamespace func(namespace string, options api.ListOptions) ([]runtime.Object, error)
// MatchesScopeFunc knows how to evaluate if an object matches a scope // MatchesScopeFunc knows how to evaluate if an object matches a scope
type MatchesScopeFunc func(scope api.ResourceQuotaScope, object runtime.Object) bool type MatchesScopeFunc func(scope api.ResourceQuotaScope, object runtime.Object) bool
@ -171,18 +183,12 @@ func (g *GenericEvaluator) UsageStats(options quota.UsageStatsOptions) (quota.Us
for _, resourceName := range g.MatchedResourceNames { for _, resourceName := range g.MatchedResourceNames {
result.Used[resourceName] = resource.MustParse("0") result.Used[resourceName] = resource.MustParse("0")
} }
list, err := g.ListFuncByNamespace(options.Namespace, api.ListOptions{}) items, err := g.ListFuncByNamespace(options.Namespace, api.ListOptions{
LabelSelector: labels.Everything(),
})
if err != nil { if err != nil {
return result, fmt.Errorf("%s: Failed to list %v: %v", g.Name, g.GroupKind(), err) return result, fmt.Errorf("%s: Failed to list %v: %v", g.Name, g.GroupKind(), err)
} }
_, err = meta.ListAccessor(list)
if err != nil {
return result, fmt.Errorf("%s: Unable to understand list result, does not appear to be a list %#v", g.Name, list)
}
items, err := meta.ExtractList(list)
if err != nil {
return result, fmt.Errorf("%s: Unable to understand list result %#v (%v)", g.Name, list, err)
}
for _, item := range items { for _, item := range items {
// need to verify that the item matches the set of scopes // need to verify that the item matches the set of scopes
matchesScopes := true matchesScopes := true

View File

@ -16,6 +16,7 @@ go_library(
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/quota:go_default_library", "//pkg/quota:go_default_library",
"//pkg/quota/evaluator/core:go_default_library", "//pkg/quota/evaluator/core:go_default_library",
], ],

View File

@ -18,13 +18,14 @@ package install
import ( import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/evaluator/core" "k8s.io/kubernetes/pkg/quota/evaluator/core"
) )
// NewRegistry returns a registry that knows how to deal kubernetes resources // NewRegistry returns a registry of quota evaluators.
// across API groups // If a shared informer factory is provided, it is used by evaluators rather than performing direct queries.
func NewRegistry(kubeClient clientset.Interface) quota.Registry { func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry {
// TODO: when quota supports resources in other api groups, we will need to merge // TODO: when quota supports resources in other api groups, we will need to merge
return core.NewRegistry(kubeClient) return core.NewRegistry(kubeClient, f)
} }

View File

@ -31,8 +31,9 @@ import (
func init() { func init() {
admission.RegisterPlugin("ResourceQuota", admission.RegisterPlugin("ResourceQuota",
func(client clientset.Interface, config io.Reader) (admission.Interface, error) { func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
registry := install.NewRegistry(client) // NOTE: we do not provide informers to the registry because admission level decisions
// TODO: expose a stop channel in admission factory // does not require us to open watches for all items tracked by quota.
registry := install.NewRegistry(client, nil)
return NewResourceQuota(client, registry, 5, make(chan struct{})) return NewResourceQuota(client, registry, 5, make(chan struct{}))
}) })
} }

View File

@ -126,7 +126,7 @@ func TestAdmissionIgnoresDelete(t *testing.T) {
kubeClient := fake.NewSimpleClientset() kubeClient := fake.NewSimpleClientset()
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient), 5, stopCh) handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient, nil), 5, stopCh)
if err != nil { if err != nil {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
} }
@ -158,7 +158,7 @@ func TestAdmissionIgnoresSubresources(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -201,7 +201,7 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -283,7 +283,7 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -379,7 +379,7 @@ func TestAdmitHandlesCreatingUpdates(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -472,7 +472,7 @@ func TestAdmitExceedQuotaLimit(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -515,7 +515,7 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -568,7 +568,7 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
quotaAccessor.liveLookupCache = liveLookupCache quotaAccessor.liveLookupCache = liveLookupCache
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -633,7 +633,7 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -737,7 +737,7 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -828,7 +828,7 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -945,7 +945,7 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
evaluator.(*quotaEvaluator).registry = registry evaluator.(*quotaEvaluator).registry = registry
handler := &quotaAdmission{ handler := &quotaAdmission{
@ -990,7 +990,7 @@ func TestAdmitRejectsNegativeUsage(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),
@ -1035,7 +1035,7 @@ func TestAdmitWhenUnrelatedResourceExceedsQuota(t *testing.T) {
quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor, _ := newQuotaAccessor(kubeClient)
quotaAccessor.indexer = indexer quotaAccessor.indexer = indexer
go quotaAccessor.Run(stopCh) go quotaAccessor.Run(stopCh)
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), nil, 5, stopCh) evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient, nil), nil, 5, stopCh)
handler := &quotaAdmission{ handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update), Handler: admission.NewHandler(admission.Create, admission.Update),

View File

@ -64,7 +64,7 @@ func TestQuota(t *testing.T) {
admissionCh := make(chan struct{}) admissionCh := make(chan struct{})
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &registered.GroupOrDie(api.GroupName).GroupVersion}})
admission, err := resourcequota.NewResourceQuota(clientset, quotainstall.NewRegistry(clientset), 5, admissionCh) admission, err := resourcequota.NewResourceQuota(clientset, quotainstall.NewRegistry(clientset, nil), 5, admissionCh)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -85,7 +85,7 @@ func TestQuota(t *testing.T) {
go replicationcontroller.NewReplicationManagerFromClientForIntegration(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). go replicationcontroller.NewReplicationManagerFromClientForIntegration(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
Run(3, controllerCh) Run(3, controllerCh)
resourceQuotaRegistry := quotainstall.NewRegistry(clientset) resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil)
groupKindsToReplenish := []unversioned.GroupKind{ groupKindsToReplenish := []unversioned.GroupKind{
api.Kind("Pod"), api.Kind("Pod"),
} }