From 3fad0cb52a0521e4094789ebbb69bcd5126c451c Mon Sep 17 00:00:00 2001 From: Derek Carr Date: Sat, 18 Feb 2017 12:10:22 -0500 Subject: [PATCH] Implement support for limited resources in quota --- hack/.linted_packages | 2 + hack/local-up-cluster.sh | 2 + plugin/pkg/admission/resourcequota/BUILD | 15 +- .../pkg/admission/resourcequota/admission.go | 29 +- .../admission/resourcequota/admission_test.go | 260 +++++++++++++++++- plugin/pkg/admission/resourcequota/config.go | 72 +++++ .../pkg/admission/resourcequota/controller.go | 88 +++++- test/integration/quota/quota_test.go | 111 +++++++- 8 files changed, 548 insertions(+), 31 deletions(-) create mode 100644 plugin/pkg/admission/resourcequota/config.go diff --git a/hack/.linted_packages b/hack/.linted_packages index 11847a3178d..8c2dc80afd7 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -260,6 +260,8 @@ plugin/pkg/admission/gc plugin/pkg/admission/imagepolicy plugin/pkg/admission/namespace/autoprovision plugin/pkg/admission/namespace/exists +plugin/pkg/admission/resourcequota/apis/resourcequota/install +plugin/pkg/admission/resourcequota/apis/resourcequota/validation plugin/pkg/admission/securitycontext/scdeny plugin/pkg/auth plugin/pkg/auth/authorizer diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index 8e947cb5477..6a2ee6c0c24 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -73,6 +73,7 @@ AUTH_ARGS=${AUTH_ARGS:-""} KUBE_CACHE_MUTATION_DETECTOR="${KUBE_CACHE_MUTATION_DETECTOR:-true}" export KUBE_CACHE_MUTATION_DETECTOR +ADMISSION_CONTROL_CONFIG_FILE=${ADMISSION_CONTROL_CONFIG_FILE:-""} # START_MODE can be 'all', 'kubeletonly', or 'nokubelet' START_MODE=${START_MODE:-"all"} @@ -431,6 +432,7 @@ function start_apiserver { --service-account-key-file="${SERVICE_ACCOUNT_KEY}" \ --service-account-lookup="${SERVICE_ACCOUNT_LOOKUP}" \ --admission-control="${ADMISSION_CONTROL}" \ + --admission-control-config-file="${ADMISSION_CONTROL_CONFIG_FILE}" \ --bind-address="${API_BIND_ADDR}" \ --secure-port="${API_SECURE_PORT}" \ --tls-cert-file="${CERT_DIR}/serving-kube-apiserver.crt" \ diff --git a/plugin/pkg/admission/resourcequota/BUILD b/plugin/pkg/admission/resourcequota/BUILD index 435b4942640..c693c4f7119 100644 --- a/plugin/pkg/admission/resourcequota/BUILD +++ b/plugin/pkg/admission/resourcequota/BUILD @@ -12,6 +12,7 @@ go_library( name = "go_default_library", srcs = [ "admission.go", + "config.go", "controller.go", "doc.go", "resource_access.go", @@ -24,11 +25,19 @@ go_library( "//pkg/quota:go_default_library", "//pkg/quota/install:go_default_library", "//pkg/util/workqueue/prometheus:go_default_library", + "//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library", + "//plugin/pkg/admission/resourcequota/apis/resourcequota/install:go_default_library", + "//plugin/pkg/admission/resourcequota/apis/resourcequota/v1alpha1:go_default_library", + "//plugin/pkg/admission/resourcequota/apis/resourcequota/validation:go_default_library", "//vendor:github.com/golang/glog", "//vendor:github.com/hashicorp/golang-lru", "//vendor:k8s.io/apimachinery/pkg/api/meta", + "//vendor:k8s.io/apimachinery/pkg/apimachinery/announced", + "//vendor:k8s.io/apimachinery/pkg/apimachinery/registered", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/runtime/schema", + "//vendor:k8s.io/apimachinery/pkg/runtime/serializer", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/wait", @@ -51,6 +60,7 @@ go_test( "//pkg/quota:go_default_library", "//pkg/quota/generic:go_default_library", "//pkg/quota/install:go_default_library", + "//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library", "//vendor:github.com/hashicorp/golang-lru", "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", @@ -71,6 +81,9 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//plugin/pkg/admission/resourcequota/apis/resourcequota:all-srcs", + ], tags = ["automanaged"], ) diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index 95438166b6f..2d9ee4f7f5b 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -27,22 +27,35 @@ import ( kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/install" + resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota" + "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/validation" ) func init() { admission.RegisterPlugin("ResourceQuota", func(config io.Reader) (admission.Interface, error) { + // load the configuration provided (if any) + configuration, err := LoadConfiguration(config) + if err != nil { + return nil, err + } + // validate the configuration (if any) + if configuration != nil { + if errs := validation.ValidateConfiguration(configuration); len(errs) != 0 { + return nil, errs.ToAggregate() + } + } // NOTE: we do not provide informers to the registry because admission level decisions // does not require us to open watches for all items tracked by quota. registry := install.NewRegistry(nil, nil) - return NewResourceQuota(registry, 5, make(chan struct{})) + return NewResourceQuota(registry, configuration, 5, make(chan struct{})) }) } // quotaAdmission implements an admission controller that can enforce quota constraints type quotaAdmission struct { *admission.Handler - + config *resourcequotaapi.Configuration stopCh <-chan struct{} registry quota.Registry numEvaluators int @@ -59,12 +72,13 @@ type liveLookupEntry struct { // NewResourceQuota configures an admission controller that can enforce quota constraints // using the provided registry. The registry must have the capability to handle group/kinds that // are persisted by the server this admission controller is intercepting -func NewResourceQuota(registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) { +func NewResourceQuota(registry quota.Registry, config *resourcequotaapi.Configuration, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) { return "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), stopCh: stopCh, registry: registry, numEvaluators: numEvaluators, + config: config, }, nil } @@ -77,7 +91,7 @@ func (a *quotaAdmission) SetInternalClientSet(client internalclientset.Interface } go quotaAccessor.Run(a.stopCh) - a.evaluator = NewQuotaEvaluator(quotaAccessor, a.registry, nil, a.numEvaluators, a.stopCh) + a.evaluator = NewQuotaEvaluator(quotaAccessor, a.registry, nil, a.config, a.numEvaluators, a.stopCh) } // Validate ensures an authorizer is set. @@ -89,11 +103,10 @@ func (a *quotaAdmission) Validate() error { } // Admit makes admission decisions while enforcing quota -func (q *quotaAdmission) Admit(a admission.Attributes) (err error) { +func (a *quotaAdmission) Admit(attr admission.Attributes) (err error) { // ignore all operations that correspond to sub-resource actions - if a.GetSubresource() != "" { + if attr.GetSubresource() != "" { return nil } - - return q.evaluator.Evaluate(a) + return a.evaluator.Evaluate(attr) } diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 650cea31bd5..0bbc6b2df5b 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/generic" "k8s.io/kubernetes/pkg/quota/install" + resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota" ) func getResourceList(cpu, memory string) api.ResourceList { @@ -130,7 +131,8 @@ func TestAdmissionIgnoresDelete(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -164,7 +166,8 @@ func TestAdmissionIgnoresSubresources(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -207,7 +210,8 @@ func TestAdmitBelowQuotaLimit(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -289,7 +293,8 @@ func TestAdmitHandlesOldObjects(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -385,7 +390,8 @@ func TestAdmitHandlesCreatingUpdates(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -478,7 +484,8 @@ func TestAdmitExceedQuotaLimit(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -521,7 +528,9 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -574,7 +583,8 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) { quotaAccessor.indexer = indexer quotaAccessor.liveLookupCache = liveLookupCache go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -639,7 +649,8 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -743,7 +754,8 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -834,7 +846,8 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -929,7 +942,8 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) evaluator.(*quotaEvaluator).registry = registry handler := "aAdmission{ @@ -974,7 +988,8 @@ func TestAdmitRejectsNegativeUsage(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -1019,7 +1034,8 @@ func TestAdmitWhenUnrelatedResourceExceedsQuota(t *testing.T) { quotaAccessor, _ := newQuotaAccessor(kubeClient) quotaAccessor.indexer = indexer go quotaAccessor.Run(stopCh) - evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, 5, stopCh) + config := &resourcequotaapi.Configuration{} + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) handler := "aAdmission{ Handler: admission.NewHandler(admission.Create, admission.Update), @@ -1034,3 +1050,219 @@ func TestAdmitWhenUnrelatedResourceExceedsQuota(t *testing.T) { t.Errorf("Unexpected error: %v", err) } } + +// TestAdmitLimitedResourceNoQuota verifies if a limited resource is configured with no quota, it cannot be consumed. +func TestAdmitLimitedResourceNoQuota(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + stopCh := make(chan struct{}) + defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + + // disable consumption of cpu unless there is a covering quota. + config := &resourcequotaapi.Configuration{ + LimitedResources: []resourcequotaapi.LimitedResource{ + { + Resource: "pods", + MatchContains: []string{"cpu"}, + }, + }, + } + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) + + handler := "aAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, + } + newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", ""))) + err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err == nil { + t.Errorf("Expected an error for consuming a limited resource without quota.") + } +} + +// TestAdmitLimitedResourceNoQuotaIgnoresNonMatchingResources shows it ignores non matching resources in config. +func TestAdmitLimitedResourceNoQuotaIgnoresNonMatchingResources(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + stopCh := make(chan struct{}) + defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + + // disable consumption of cpu unless there is a covering quota. + config := &resourcequotaapi.Configuration{ + LimitedResources: []resourcequotaapi.LimitedResource{ + { + Resource: "services", + MatchContains: []string{"services"}, + }, + }, + } + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) + + handler := "aAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, + } + newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", ""))) + err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } +} + +// TestAdmitLimitedResourceWithQuota verifies if a limited resource is configured with quota, it can be consumed. +func TestAdmitLimitedResourceWithQuota(t *testing.T) { + resourceQuota := &api.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"}, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceRequestsCPU: resource.MustParse("10"), + }, + Used: api.ResourceList{ + api.ResourceRequestsCPU: resource.MustParse("1"), + }, + }, + } + kubeClient := fake.NewSimpleClientset(resourceQuota) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + stopCh := make(chan struct{}) + defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + + // disable consumption of cpu unless there is a covering quota. + // disable consumption of cpu unless there is a covering quota. + config := &resourcequotaapi.Configuration{ + LimitedResources: []resourcequotaapi.LimitedResource{ + { + Resource: "pods", + MatchContains: []string{"requests.cpu"}, // match on "requests.cpu" only + }, + }, + } + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) + + handler := "aAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, + } + indexer.Add(resourceQuota) + newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", ""))) + err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + +// TestAdmitLimitedResourceWithMultipleQuota verifies if a limited resource is configured with quota, it can be consumed if one matches. +func TestAdmitLimitedResourceWithMultipleQuota(t *testing.T) { + resourceQuota1 := &api.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota1", Namespace: "test", ResourceVersion: "124"}, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceRequestsCPU: resource.MustParse("10"), + }, + Used: api.ResourceList{ + api.ResourceRequestsCPU: resource.MustParse("1"), + }, + }, + } + resourceQuota2 := &api.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota2", Namespace: "test", ResourceVersion: "124"}, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceMemory: resource.MustParse("10Gi"), + }, + Used: api.ResourceList{ + api.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + } + kubeClient := fake.NewSimpleClientset(resourceQuota1, resourceQuota2) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + stopCh := make(chan struct{}) + defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + + // disable consumption of cpu unless there is a covering quota. + // disable consumption of cpu unless there is a covering quota. + config := &resourcequotaapi.Configuration{ + LimitedResources: []resourcequotaapi.LimitedResource{ + { + Resource: "pods", + MatchContains: []string{"requests.cpu"}, // match on "requests.cpu" only + }, + }, + } + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) + + handler := "aAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, + } + indexer.Add(resourceQuota1) + indexer.Add(resourceQuota2) + newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", ""))) + err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } +} + +// TestAdmitLimitedResourceWithQuotaThatDoesNotCover verifies if a limited resource is configured the quota must cover the resource. +func TestAdmitLimitedResourceWithQuotaThatDoesNotCover(t *testing.T) { + resourceQuota := &api.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"}, + Status: api.ResourceQuotaStatus{ + Hard: api.ResourceList{ + api.ResourceMemory: resource.MustParse("10Gi"), + }, + Used: api.ResourceList{ + api.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + } + kubeClient := fake.NewSimpleClientset(resourceQuota) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + stopCh := make(chan struct{}) + defer close(stopCh) + + quotaAccessor, _ := newQuotaAccessor(kubeClient) + quotaAccessor.indexer = indexer + go quotaAccessor.Run(stopCh) + + // disable consumption of cpu unless there is a covering quota. + // disable consumption of cpu unless there is a covering quota. + config := &resourcequotaapi.Configuration{ + LimitedResources: []resourcequotaapi.LimitedResource{ + { + Resource: "pods", + MatchContains: []string{"cpu"}, // match on "cpu" only + }, + }, + } + evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(nil, nil), nil, config, 5, stopCh) + + handler := "aAdmission{ + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, + } + indexer.Add(resourceQuota) + newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", ""))) + err := handler.Admit(admission.NewAttributesRecord(newPod, nil, api.Kind("Pod").WithVersion("version"), newPod.Namespace, newPod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err == nil { + t.Fatalf("Expected an error since the quota did not cover cpu") + } +} diff --git a/plugin/pkg/admission/resourcequota/config.go b/plugin/pkg/admission/resourcequota/config.go new file mode 100644 index 00000000000..d706a9afb56 --- /dev/null +++ b/plugin/pkg/admission/resourcequota/config.go @@ -0,0 +1,72 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequota + +import ( + "fmt" + "io" + "io/ioutil" + "os" + + "k8s.io/apimachinery/pkg/apimachinery/announced" + "k8s.io/apimachinery/pkg/apimachinery/registered" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota" + "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/install" + resourcequotav1alpha1 "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota/v1alpha1" +) + +var ( + groupFactoryRegistry = make(announced.APIGroupFactoryRegistry) + registry = registered.NewOrDie(os.Getenv("KUBE_API_VERSIONS")) + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) +) + +func init() { + install.Install(groupFactoryRegistry, registry, scheme) +} + +// LoadConfiguration loads the provided configuration. +func LoadConfiguration(config io.Reader) (*resourcequotaapi.Configuration, error) { + // if no config is provided, return a default configuration + if config == nil { + externalConfig := &resourcequotav1alpha1.Configuration{} + scheme.Default(externalConfig) + internalConfig := &resourcequotaapi.Configuration{} + if err := scheme.Convert(externalConfig, internalConfig, nil); err != nil { + return nil, err + } + return internalConfig, nil + } + // we have a config so parse it. + data, err := ioutil.ReadAll(config) + if err != nil { + return nil, err + } + decoder := codecs.UniversalDecoder() + decodedObj, err := runtime.Decode(decoder, data) + if err != nil { + return nil, err + } + resourceQuotaConfiguration, ok := decodedObj.(*resourcequotaapi.Configuration) + if !ok { + return nil, fmt.Errorf("unexpected type: %T", decodedObj) + } + return resourceQuotaConfiguration, nil +} diff --git a/plugin/pkg/admission/resourcequota/controller.go b/plugin/pkg/admission/resourcequota/controller.go index 5fc9ab9a209..1c5a5002e56 100644 --- a/plugin/pkg/admission/resourcequota/controller.go +++ b/plugin/pkg/admission/resourcequota/controller.go @@ -26,6 +26,7 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -34,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/quota" _ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration + resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota" ) // Evaluator is used to see if quota constraints are satisfied. @@ -65,6 +67,9 @@ type quotaEvaluator struct { workers int stopCh <-chan struct{} init sync.Once + + // lets us know what resources are limited by default + config *resourcequotaapi.Configuration } type admissionWaiter struct { @@ -79,6 +84,7 @@ func (defaultDeny) Error() string { return "DEFAULT DENY" } +// IsDefaultDeny returns true if the error is defaultDeny func IsDefaultDeny(err error) bool { if err == nil { return false @@ -99,7 +105,11 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter { // NewQuotaEvaluator configures an admission controller that can enforce quota constraints // using the provided registry. The registry must have the capability to handle group/kinds that // are persisted by the server this admission controller is intercepting -func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAcquisitionFunc func([]api.ResourceQuota) func(), workers int, stopCh <-chan struct{}) Evaluator { +func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, lockAcquisitionFunc func([]api.ResourceQuota) func(), config *resourcequotaapi.Configuration, workers int, stopCh <-chan struct{}) Evaluator { + // if we get a nil config, just create an empty default. + if config == nil { + config = &resourcequotaapi.Configuration{} + } return "aEvaluator{ quotaAccessor: quotaAccessor, lockAcquisitionFunc: lockAcquisitionFunc, @@ -113,6 +123,7 @@ func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, loc workers: workers, stopCh: stopCh, + config: config, } } @@ -166,7 +177,9 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis } return } - if len(quotas) == 0 { + // if limited resources are disabled, we can just return safely when there are no quotas. + limitedResourcesDisabled := len(e.config.LimitedResources) == 0 + if len(quotas) == 0 && limitedResourcesDisabled { for _, admissionAttribute := range admissionAttributes { admissionAttribute.result = nil } @@ -316,6 +329,41 @@ func copyQuotas(in []api.ResourceQuota) ([]api.ResourceQuota, error) { return out, nil } +// filterLimitedResourcesByGroupResource filters the input that match the specified groupResource +func filterLimitedResourcesByGroupResource(input []resourcequotaapi.LimitedResource, groupResource schema.GroupResource) []resourcequotaapi.LimitedResource { + result := []resourcequotaapi.LimitedResource{} + for i := range input { + limitedResource := input[i] + limitedGroupResource := schema.GroupResource{Group: limitedResource.APIGroup, Resource: limitedResource.Resource} + if limitedGroupResource == groupResource { + result = append(result, limitedResource) + } + } + return result +} + +// limitedByDefault determines from the specfified usage and limitedResources the set of resources names +// that must be present in a covering quota. It returns an error if it was unable to determine if +// a resource was not limited by default. +func limitedByDefault(usage api.ResourceList, limitedResources []resourcequotaapi.LimitedResource) []api.ResourceName { + result := []api.ResourceName{} + for _, limitedResource := range limitedResources { + for k, v := range usage { + // if a resource is consumed, we need to check if it matches on the limited resource list. + if v.Sign() == 1 { + // if we get a match, we add it to limited set + for _, matchContain := range limitedResource.MatchContains { + if strings.Contains(string(k), matchContain) { + result = append(result, k) + break + } + } + } + } + } + return result +} + // checkRequest verifies that the request does not exceed any quota constraint. it returns a copy of quotas not yet persisted // that capture what the usage would be if the request succeeded. It return an error if the is insufficient quota to satisfy the request func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.Attributes) ([]api.ResourceQuota, error) { @@ -331,12 +379,30 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At return quotas, nil } + // if we have limited resources enabled for this resource, always calculate usage + inputObject := a.GetObject() + + // determine the set of resource names that must exist in a covering quota + limitedResourceNames := []api.ResourceName{} + limitedResources := filterLimitedResourcesByGroupResource(e.config.LimitedResources, a.GetResource().GroupResource()) + if len(limitedResources) > 0 { + deltaUsage, err := evaluator.Usage(inputObject) + if err != nil { + return quotas, err + } + limitedResourceNames = limitedByDefault(deltaUsage, limitedResources) + } + limitedResourceNamesSet := quota.ToSet(limitedResourceNames) + // find the set of quotas that are pertinent to this request // reject if we match the quota, but usage is not calculated yet // reject if the input object does not satisfy quota constraints // if there are no pertinent quotas, we can just return - inputObject := a.GetObject() interestingQuotaIndexes := []int{} + // track the cumulative set of resources that were required across all quotas + // this is needed to know if we have satisfied any constraints where consumption + // was limited by default. + restrictedResourcesSet := sets.String{} for i := range quotas { resourceQuota := quotas[i] match, err := evaluator.Matches(&resourceQuota, inputObject) @@ -348,16 +414,26 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At } hardResources := quota.ResourceNames(resourceQuota.Status.Hard) - requiredResources := evaluator.MatchingResources(hardResources) - if err := evaluator.Constraints(requiredResources, inputObject); err != nil { + restrictedResources := evaluator.MatchingResources(hardResources) + if err := evaluator.Constraints(restrictedResources, inputObject); err != nil { return nil, admission.NewForbidden(a, fmt.Errorf("failed quota: %s: %v", resourceQuota.Name, err)) } if !hasUsageStats(&resourceQuota) { return nil, admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name)) } - interestingQuotaIndexes = append(interestingQuotaIndexes, i) + localRestrictedResourcesSet := quota.ToSet(restrictedResources) + restrictedResourcesSet.Insert(localRestrictedResourcesSet.List()...) } + + // verify that for every resource that had limited by default consumption + // enabled that there was a corresponding quota that covered its use. + // if not, we reject the request. + hasNoCoveringQuota := limitedResourceNamesSet.Difference(restrictedResourcesSet) + if len(hasNoCoveringQuota) > 0 { + return quotas, fmt.Errorf("insufficient quota to consume: %v", strings.Join(hasNoCoveringQuota.List(), ",")) + } + if len(interestingQuotaIndexes) == 0 { return quotas, nil } diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index 3029b32a372..651cef5ed41 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -35,7 +35,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller" @@ -44,6 +44,7 @@ import ( kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission" quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/plugin/pkg/admission/resourcequota" + resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota" "k8s.io/kubernetes/test/integration/framework" ) @@ -65,7 +66,8 @@ func TestQuota(t *testing.T) { admissionCh := make(chan struct{}) clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) internalClientset := internalclientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - admission, err := resourcequota.NewResourceQuota(quotainstall.NewRegistry(nil, nil), 5, admissionCh) + config := &resourcequotaapi.Configuration{} + admission, err := resourcequota.NewResourceQuota(quotainstall.NewRegistry(nil, nil), config, 5, admissionCh) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -226,3 +228,108 @@ func scale(t *testing.T, namespace string, clientset *clientset.Clientset) { t.Fatalf("unexpected error: %v, ended with %v pods", err, len(pods.Items)) } } + +func TestQuotaLimitedResourceDenial(t *testing.T) { + // Set up a master + h := &framework.MasterHolder{Initialized: make(chan struct{})} + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + <-h.Initialized + h.M.GenericAPIServer.Handler.ServeHTTP(w, req) + })) + defer s.Close() + + admissionCh := make(chan struct{}) + clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + internalClientset := internalclientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + + // stop creation of a pod resource unless there is a quota + config := &resourcequotaapi.Configuration{ + LimitedResources: []resourcequotaapi.LimitedResource{ + { + Resource: "pods", + MatchContains: []string{"pods"}, + }, + }, + } + admission, err := resourcequota.NewResourceQuota(quotainstall.NewRegistry(nil, nil), config, 5, admissionCh) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + admission.(kubeadmission.WantsInternalClientSet).SetInternalClientSet(internalClientset) + defer close(admissionCh) + + masterConfig := framework.NewIntegrationTestMasterConfig() + masterConfig.GenericConfig.AdmissionControl = admission + framework.RunAMasterUsingServer(masterConfig, s, h) + + ns := framework.CreateTestingNamespace("quota", s, t) + defer framework.DeleteTestingNamespace(ns, s, t) + + controllerCh := make(chan struct{}) + defer close(controllerCh) + + informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) + rm := replicationcontroller.NewReplicationManager( + informers.Core().V1().Pods(), + informers.Core().V1().ReplicationControllers(), + clientset, + replicationcontroller.BurstReplicas, + 4096, + false, + ) + rm.SetEventRecorder(&record.FakeRecorder{}) + go rm.Run(3, controllerCh) + + resourceQuotaRegistry := quotainstall.NewRegistry(clientset, nil) + groupKindsToReplenish := []schema.GroupKind{ + api.Kind("Pod"), + } + resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ + KubeClient: clientset, + ResourceQuotaInformer: informers.Core().V1().ResourceQuotas(), + ResyncPeriod: controller.NoResyncPeriodFunc, + Registry: resourceQuotaRegistry, + GroupKindsToReplenish: groupKindsToReplenish, + ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(informers), + } + go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, controllerCh) + informers.Start(controllerCh) + + // try to create a pod + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns.Name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container", + Image: "busybox", + }, + }, + }, + } + if _, err := clientset.Core().Pods(ns.Name).Create(pod); err == nil { + t.Fatalf("expected error for insufficient quota") + } + + // now create a covering quota + quota := &v1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "quota", + Namespace: ns.Name, + }, + Spec: v1.ResourceQuotaSpec{ + Hard: v1.ResourceList{ + v1.ResourcePods: resource.MustParse("1000"), + }, + }, + } + waitForQuota(t, quota, clientset) + + if _, err := clientset.Core().Pods(ns.Name).Create(pod); err != nil { + t.Fatalf("unexpected error: %v", err) + } +}