diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index d8a678b6952..8a0fd5d0a07 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -17,50 +17,31 @@ limitations under the License. package resourcequota import ( - "fmt" "io" - "math/rand" "strings" "time" - "github.com/hashicorp/golang-lru" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/api/meta" - "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota/install" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/watch" ) func init() { admission.RegisterPlugin("ResourceQuota", func(client clientset.Interface, config io.Reader) (admission.Interface, error) { registry := install.NewRegistry(client) - return NewResourceQuota(client, registry) + return NewResourceQuota(client, registry, 5) }) } // quotaAdmission implements an admission controller that can enforce quota constraints type quotaAdmission struct { *admission.Handler - // must be able to read/write ResourceQuota - client clientset.Interface - // indexer that holds quota objects by namespace - indexer cache.Indexer - // registry that knows how to measure usage for objects - registry quota.Registry - // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures. - // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. - // We track the lookup result here so that for repeated requests, we don't look it up very often. - liveLookupCache *lru.Cache - liveTTL time.Duration + evaluator *quotaEvaluator } type liveLookupEntry struct { @@ -71,28 +52,16 @@ type liveLookupEntry struct { // NewResourceQuota configures an admission controller that can enforce quota constraints // using the provided registry. The registry must have the capability to handle group/kinds that // are persisted by the server this admission controller is intercepting -func NewResourceQuota(client clientset.Interface, registry quota.Registry) (admission.Interface, error) { - liveLookupCache, err := lru.New(100) +func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int) (admission.Interface, error) { + evaluator, err := newQuotaEvaluator(client, registry) if err != nil { return nil, err } - lw := &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().ResourceQuotas(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options) - }, - } - indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0) - reflector.Run() + evaluator.Run(numEvaluators) + return "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: client, - indexer: indexer, - registry: registry, - liveLookupCache: liveLookupCache, - liveTTL: time.Duration(30 * time.Second), + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, }, nil } @@ -104,7 +73,7 @@ func (q *quotaAdmission) Admit(a admission.Attributes) (err error) { } // if we do not know how to evaluate use for this kind, just ignore - evaluators := q.registry.Evaluators() + evaluators := q.evaluator.registry.Evaluators() evaluator, found := evaluators[a.GetKind()] if !found { return nil @@ -118,183 +87,7 @@ func (q *quotaAdmission) Admit(a admission.Attributes) (err error) { return nil } - // determine if there are any quotas in this namespace - // if there are no quotas, we don't need to do anything - namespace, name := a.GetNamespace(), a.GetName() - items, err := q.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}}) - if err != nil { - return admission.NewForbidden(a, fmt.Errorf("Error resolving quota.")) - } - // if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it. - if len(items) == 0 { - lruItemObj, ok := q.liveLookupCache.Get(a.GetNamespace()) - if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { - // TODO: If there are multiple operations at the same time and cache has just expired, - // this may cause multiple List operations being issued at the same time. - // If there is already in-flight List() for a given namespace, we should wait until - // it is finished and cache is updated instead of doing the same, also to avoid - // throttling - see #22422 for details. - liveList, err := q.client.Core().ResourceQuotas(namespace).List(api.ListOptions{}) - if err != nil { - return admission.NewForbidden(a, err) - } - newEntry := liveLookupEntry{expiry: time.Now().Add(q.liveTTL)} - for i := range liveList.Items { - newEntry.items = append(newEntry.items, &liveList.Items[i]) - } - q.liveLookupCache.Add(a.GetNamespace(), newEntry) - lruItemObj = newEntry - } - lruEntry := lruItemObj.(liveLookupEntry) - for i := range lruEntry.items { - items = append(items, lruEntry.items[i]) - } - } - // if there are still no items, we can return - if len(items) == 0 { - return nil - } - - // find the set of quotas that are pertinent to this request - // reject if we match the quota, but usage is not calculated yet - // reject if the input object does not satisfy quota constraints - // if there are no pertinent quotas, we can just return - inputObject := a.GetObject() - resourceQuotas := []*api.ResourceQuota{} - for i := range items { - resourceQuota := items[i].(*api.ResourceQuota) - match := evaluator.Matches(resourceQuota, inputObject) - if !match { - continue - } - hardResources := quota.ResourceNames(resourceQuota.Status.Hard) - evaluatorResources := evaluator.MatchesResources() - requiredResources := quota.Intersection(hardResources, evaluatorResources) - err := evaluator.Constraints(requiredResources, inputObject) - if err != nil { - return admission.NewForbidden(a, fmt.Errorf("Failed quota: %s: %v", resourceQuota.Name, err)) - } - if !hasUsageStats(resourceQuota) { - return admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name)) - } - resourceQuotas = append(resourceQuotas, resourceQuota) - } - if len(resourceQuotas) == 0 { - return nil - } - - // Usage of some resources cannot be counted in isolation. For example when - // the resource represents a number of unique references to external - // resource. In such a case an evaluator needs to process other objects in - // the same namespace which needs to be known. - if accessor, err := meta.Accessor(inputObject); namespace != "" && err == nil { - if accessor.GetNamespace() == "" { - accessor.SetNamespace(namespace) - } - } - - // there is at least one quota that definitely matches our object - // as a result, we need to measure the usage of this object for quota - // on updates, we need to subtract the previous measured usage - // if usage shows no change, just return since it has no impact on quota - deltaUsage := evaluator.Usage(inputObject) - if admission.Update == op { - prevItem, err := evaluator.Get(namespace, name) - if err != nil { - return admission.NewForbidden(a, fmt.Errorf("Unable to get previous: %v", err)) - } - prevUsage := evaluator.Usage(prevItem) - deltaUsage = quota.Subtract(deltaUsage, prevUsage) - } - if quota.IsZero(deltaUsage) { - return nil - } - - // TODO: Move to a bucketing work queue - // If we guaranteed that we processed the request in order it was received to server, we would reduce quota conflicts. - // Until we have the bucketing work queue, we jitter requests and retry on conflict. - numRetries := 10 - interval := time.Duration(rand.Int63n(90)+int64(10)) * time.Millisecond - - // seed the retry loop with the initial set of quotas to process (should reduce each iteration) - resourceQuotasToProcess := resourceQuotas - for retry := 1; retry <= numRetries; retry++ { - // the list of quotas we will try again if there is a version conflict - tryAgain := []*api.ResourceQuota{} - - // check that we pass all remaining quotas so we do not prematurely charge - // for each quota, mask the usage to the set of resources tracked by the quota - // if request + used > hard, return an error describing the failure - updatedUsage := map[string]api.ResourceList{} - for _, resourceQuota := range resourceQuotasToProcess { - hardResources := quota.ResourceNames(resourceQuota.Status.Hard) - requestedUsage := quota.Mask(deltaUsage, hardResources) - newUsage := quota.Add(resourceQuota.Status.Used, requestedUsage) - if allowed, exceeded := quota.LessThanOrEqual(newUsage, resourceQuota.Status.Hard); !allowed { - failedRequestedUsage := quota.Mask(requestedUsage, exceeded) - failedUsed := quota.Mask(resourceQuota.Status.Used, exceeded) - failedHard := quota.Mask(resourceQuota.Status.Hard, exceeded) - return admission.NewForbidden(a, - fmt.Errorf("exceeded quota: %s, requested: %s, used: %s, limited: %s", - resourceQuota.Name, - prettyPrint(failedRequestedUsage), - prettyPrint(failedUsed), - prettyPrint(failedHard))) - } - updatedUsage[resourceQuota.Name] = newUsage - } - - // update the status for each quota with its new usage - // if we get a conflict, get updated quota, and enqueue - for i, resourceQuota := range resourceQuotasToProcess { - newUsage := updatedUsage[resourceQuota.Name] - quotaToUpdate := &api.ResourceQuota{ - ObjectMeta: api.ObjectMeta{ - Name: resourceQuota.Name, - Namespace: resourceQuota.Namespace, - ResourceVersion: resourceQuota.ResourceVersion, - }, - Status: api.ResourceQuotaStatus{ - Hard: quota.Add(api.ResourceList{}, resourceQuota.Status.Hard), - Used: newUsage, - }, - } - _, err = q.client.Core().ResourceQuotas(quotaToUpdate.Namespace).UpdateStatus(quotaToUpdate) - if err != nil { - if !errors.IsConflict(err) { - return admission.NewForbidden(a, fmt.Errorf("Unable to update quota status: %s %v", resourceQuota.Name, err)) - } - // if we get a conflict, we get the latest copy of the quota documents that were not yet modified so we retry all with latest state. - for fetchIndex := i; fetchIndex < len(resourceQuotasToProcess); fetchIndex++ { - latestQuota, err := q.client.Core().ResourceQuotas(namespace).Get(resourceQuotasToProcess[fetchIndex].Name) - if err != nil { - return admission.NewForbidden(a, fmt.Errorf("Unable to get quota: %s %v", resourceQuotasToProcess[fetchIndex].Name, err)) - } - tryAgain = append(tryAgain, latestQuota) - } - break - } - } - - // all quotas were updated, so we can return - if len(tryAgain) == 0 { - return nil - } - - // we have concurrent requests to update quota, so look to retry if needed - // next iteration, we need to process the items that have to try again - // pause the specified interval to encourage jitter - if retry == numRetries { - names := []string{} - for _, quota := range tryAgain { - names = append(names, quota.Name) - } - return admission.NewForbidden(a, fmt.Errorf("Unable to update status for quota: %s, ", strings.Join(names, ","))) - } - resourceQuotasToProcess = tryAgain - time.Sleep(interval) - } - return nil + return q.evaluator.evaluate(a) } // prettyPrint formats a resource list for usage in errors diff --git a/plugin/pkg/admission/resourcequota/admission_test.go b/plugin/pkg/admission/resourcequota/admission_test.go index 048124f28da..67f6ba49576 100644 --- a/plugin/pkg/admission/resourcequota/admission_test.go +++ b/plugin/pkg/admission/resourcequota/admission_test.go @@ -75,7 +75,7 @@ func validPod(name string, numContainers int, resources api.ResourceRequirements // TestAdmissionIgnoresDelete verifies that the admission controller ignores delete operations func TestAdmissionIgnoresDelete(t *testing.T) { kubeClient := fake.NewSimpleClientset() - handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient)) + handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient), 5) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -101,13 +101,14 @@ func TestAdmissionIgnoresSubresources(t *testing.T) { resourceQuota.Status.Used[api.ResourceMemory] = resource.MustParse("1Gi") kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: install.NewRegistry(kubeClient), + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } - handler.indexer.Add(resourceQuota) + indexer.Add(resourceQuota) newPod := validPod("123", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil)) if err == nil { @@ -138,13 +139,14 @@ func TestAdmitBelowQuotaLimit(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: install.NewRegistry(kubeClient), + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } - handler.indexer.Add(resourceQuota) + indexer.Add(resourceQuota) newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil)) if err != nil { @@ -165,8 +167,9 @@ func TestAdmitBelowQuotaLimit(t *testing.T) { t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) } - lastActionIndex := len(kubeClient.Actions()) - 1 - usage := kubeClient.Actions()[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota) + decimatedActions := removeListWatch(kubeClient.Actions()) + lastActionIndex := len(decimatedActions) - 1 + usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota) expectedUsage := api.ResourceQuota{ Status: api.ResourceQuotaStatus{ Hard: api.ResourceList{ @@ -210,13 +213,14 @@ func TestAdmitExceedQuotaLimit(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: install.NewRegistry(kubeClient), + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } - handler.indexer.Add(resourceQuota) + indexer.Add(resourceQuota) newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil)) if err == nil { @@ -247,13 +251,14 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: install.NewRegistry(kubeClient), + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } - handler.indexer.Add(resourceQuota) + indexer.Add(resourceQuota) newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("200m", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil)) if err == nil { @@ -286,15 +291,16 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) { if err != nil { t.Fatal(err) } + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.liveLookupCache = liveLookupCache + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: install.NewRegistry(kubeClient), - liveLookupCache: liveLookupCache, + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } // Add to the index - handler.indexer.Add(resourceQuota) + indexer.Add(resourceQuota) newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("200m", ""))) // Add to the lru cache so we do not do a live client lookup liveLookupCache.Add(newPod.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(30 * time.Second)), items: []*api.ResourceQuota{}}) @@ -346,14 +352,15 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: install.NewRegistry(kubeClient), + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } - handler.indexer.Add(resourceQuotaNonTerminating) - handler.indexer.Add(resourceQuotaTerminating) + indexer.Add(resourceQuotaNonTerminating) + indexer.Add(resourceQuotaTerminating) // create a pod that has an active deadline newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", ""))) @@ -378,8 +385,9 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) { t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) } - lastActionIndex := len(kubeClient.Actions()) - 1 - usage := kubeClient.Actions()[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota) + decimatedActions := removeListWatch(kubeClient.Actions()) + lastActionIndex := len(decimatedActions) - 1 + usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota) // ensure only the quota-terminating was updated if usage.Name != resourceQuotaTerminating.Name { @@ -443,14 +451,15 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuotaBestEffort, resourceQuotaNotBestEffort) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: install.NewRegistry(kubeClient), + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } - handler.indexer.Add(resourceQuotaBestEffort) - handler.indexer.Add(resourceQuotaNotBestEffort) + indexer.Add(resourceQuotaBestEffort) + indexer.Add(resourceQuotaNotBestEffort) // create a pod that is best effort because it does not make a request for anything newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))) @@ -468,8 +477,9 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { if !actionSet.HasAll(expectedActionSet.List()...) { t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet)) } - lastActionIndex := len(kubeClient.Actions()) - 1 - usage := kubeClient.Actions()[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota) + decimatedActions := removeListWatch(kubeClient.Actions()) + lastActionIndex := len(decimatedActions) - 1 + usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota) if usage.Name != resourceQuotaBestEffort.Name { t.Errorf("Incremented the wrong quota, expected %v, actual %v", resourceQuotaBestEffort.Name, usage.Name) @@ -495,6 +505,19 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) { } } +func removeListWatch(in []testcore.Action) []testcore.Action { + decimatedActions := []testcore.Action{} + // list and watch resource quota is done to maintain our cache, so that's expected. Remove them from results + for i := range in { + if in[i].Matches("list", "resourcequotas") || in[i].Matches("watch", "resourcequotas") { + continue + } + + decimatedActions = append(decimatedActions, in[i]) + } + return decimatedActions +} + // TestAdmitBestEffortQuotaLimitIgnoresBurstable validates that a besteffort quota does not match a resource // guaranteed pod. func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { @@ -514,20 +537,23 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) { } kubeClient := fake.NewSimpleClientset(resourceQuota) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}) + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: install.NewRegistry(kubeClient), + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } - handler.indexer.Add(resourceQuota) + indexer.Add(resourceQuota) newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", ""))) err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil)) if err != nil { t.Errorf("Unexpected error: %v", err) } - if len(kubeClient.Actions()) != 0 { - t.Errorf("Expected no client actions because the incoming pod did not match best effort quota") + + decimatedActions := removeListWatch(kubeClient.Actions()) + if len(decimatedActions) != 0 { + t.Errorf("Expected no client actions because the incoming pod did not match best effort quota: %v", kubeClient.Actions()) } } @@ -623,13 +649,15 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) { podEvaluator.GroupKind(): podEvaluator, }, } + evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient)) + evaluator.indexer = indexer + evaluator.registry = registry + evaluator.Run(5) handler := "aAdmission{ - Handler: admission.NewHandler(admission.Create, admission.Update), - client: kubeClient, - indexer: indexer, - registry: registry, + Handler: admission.NewHandler(admission.Create, admission.Update), + evaluator: evaluator, } - handler.indexer.Add(resourceQuota) + indexer.Add(resourceQuota) newPod := validPod("pod-without-namespace", 1, getResourceRequirements(getResourceList("1", "2Gi"), getResourceList("", ""))) // unset the namespace diff --git a/plugin/pkg/admission/resourcequota/controller.go b/plugin/pkg/admission/resourcequota/controller.go new file mode 100644 index 00000000000..2ce6ed9a578 --- /dev/null +++ b/plugin/pkg/admission/resourcequota/controller.go @@ -0,0 +1,502 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequota + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/golang-lru" + + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + + "k8s.io/kubernetes/pkg/admission" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/quota" + "k8s.io/kubernetes/pkg/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +type quotaEvaluator struct { + client clientset.Interface + + // indexer that holds quota objects by namespace + indexer cache.Indexer + // registry that knows how to measure usage for objects + registry quota.Registry + + // liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures. + // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. + // We track the lookup result here so that for repeated requests, we don't look it up very often. + liveLookupCache *lru.Cache + liveTTL time.Duration + + // TODO these are used together to bucket items by namespace and then batch them up for processing. + // The technique is valuable for rollup activities to avoid fanout and reduce resource contention. + // We could move this into a library if another component needed it. + // queue is indexed by namespace, so that we bundle up on a per-namespace basis + queue *workqueue.Type + workLock sync.Mutex + work map[string][]*admissionWaiter + dirtyWork map[string][]*admissionWaiter + inProgress sets.String +} + +type admissionWaiter struct { + attributes admission.Attributes + finished chan struct{} + result error +} + +type defaultDeny struct{} + +func (defaultDeny) Error() string { + return "DEFAULT DENY" +} + +func IsDefaultDeny(err error) bool { + if err == nil { + return false + } + + _, ok := err.(defaultDeny) + return ok +} + +func newAdmissionWaiter(a admission.Attributes) *admissionWaiter { + return &admissionWaiter{ + attributes: a, + finished: make(chan struct{}), + result: defaultDeny{}, + } +} + +// newQuotaEvaluator configures an admission controller that can enforce quota constraints +// using the provided registry. The registry must have the capability to handle group/kinds that +// are persisted by the server this admission controller is intercepting +func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*quotaEvaluator, error) { + liveLookupCache, err := lru.New(100) + if err != nil { + return nil, err + } + lw := &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().ResourceQuotas(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options) + }, + } + indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0) + + reflector.Run() + return "aEvaluator{ + client: client, + indexer: indexer, + registry: registry, + liveLookupCache: liveLookupCache, + liveTTL: time.Duration(30 * time.Second), + + queue: workqueue.New(), + work: map[string][]*admissionWaiter{}, + dirtyWork: map[string][]*admissionWaiter{}, + inProgress: sets.String{}, + }, nil +} + +// Run begins watching and syncing. +func (e *quotaEvaluator) Run(workers int) { + defer utilruntime.HandleCrash() + + for i := 0; i < workers; i++ { + go wait.Until(e.doWork, time.Second, make(chan struct{})) + } +} + +func (e *quotaEvaluator) doWork() { + for { + func() { + ns, admissionAttributes := e.getWork() + defer e.completeWork(ns) + if len(admissionAttributes) == 0 { + return + } + + e.checkAttributes(ns, admissionAttributes) + }() + } +} + +// checkAttributes iterates evaluates all the waiting admissionAttributes. It will always notify all waiters +// before returning. The default is to deny. +func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admissionWaiter) { + // notify all on exit + defer func() { + for _, admissionAttribute := range admissionAttributes { + close(admissionAttribute.finished) + } + }() + + quotas, err := e.getQuotas(ns) + if err != nil { + for _, admissionAttribute := range admissionAttributes { + admissionAttribute.result = err + } + return + } + if len(quotas) == 0 { + for _, admissionAttribute := range admissionAttributes { + admissionAttribute.result = nil + } + return + } + + e.checkQuotas(quotas, admissionAttributes, 3) +} + +// checkQuotas checks the admission atttributes against the passed quotas. If a quota applies, it will attempt to update it +// AFTER it has checked all the admissionAttributes. The method breaks down into phase like this: +// 0. make a copy of the quotas to act as a "running" quota so we know what we need to update and can still compare against the +// originals +// 1. check each admission attribute to see if it fits within *all* the quotas. If it doesn't fit, mark the waiter as failed +// and the running quota don't change. If it did fit, check to see if any quota was changed. It there was no quota change +// mark the waiter as succeeded. If some quota did change, update the running quotas +// 2. If no running quota was changed, return now since no updates are needed. +// 3. for each quota that has changed, attempt an update. If all updates succeeded, update all unset waiters to success status and return. If the some +// updates failed on conflict errors and we have retries left, re-get the failed quota from our cache for the latest version +// and recurse into this method with the subset. It's safe for us to evaluate ONLY the subset, because the other quota +// documents for these waiters have already been evaluated. Step 1, will mark all the ones that should already have succeeded. +func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttributes []*admissionWaiter, remainingRetries int) { + // yet another copy to compare against originals to see if we actually have deltas + originalQuotas := make([]api.ResourceQuota, len(quotas), len(quotas)) + copy(originalQuotas, quotas) + + atLeastOneChanged := false + for i := range admissionAttributes { + admissionAttribute := admissionAttributes[i] + newQuotas, err := e.checkRequest(quotas, admissionAttribute.attributes) + if err != nil { + admissionAttribute.result = err + continue + } + + // if the new quotas are the same as the old quotas, then this particular one doesn't issue any updates + // that means that no quota docs applied, so it can get a pass + atLeastOneChangeForThisWaiter := false + for j := range newQuotas { + if !quota.Equals(originalQuotas[j].Status.Used, newQuotas[j].Status.Used) { + atLeastOneChanged = true + atLeastOneChangeForThisWaiter = true + break + } + } + + if !atLeastOneChangeForThisWaiter { + admissionAttribute.result = nil + } + quotas = newQuotas + } + + // if none of the requests changed anything, there's no reason to issue an update, just fail them all now + if !atLeastOneChanged { + return + } + + // now go through and try to issue updates. Things get a little weird here: + // 1. check to see if the quota changed. If not, skip. + // 2. if the quota changed and the update passes, be happy + // 3. if the quota changed and the update fails, add the original to a retry list + var updatedFailedQuotas []api.ResourceQuota + var lastErr error + for i := range quotas { + newQuota := quotas[i] + // if this quota didn't have its status changed, skip it + if quota.Equals(originalQuotas[i].Status.Used, newQuota.Status.Used) { + continue + } + + if _, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil { + updatedFailedQuotas = append(updatedFailedQuotas, newQuota) + lastErr = err + } + } + + if len(updatedFailedQuotas) == 0 { + // all the updates succeeded. At this point, anything with the default deny error was just waiting to + // get a successful update, so we can mark and notify + for _, admissionAttribute := range admissionAttributes { + if IsDefaultDeny(admissionAttribute.result) { + admissionAttribute.result = nil + } + } + return + } + + // at this point, errors are fatal. Update all waiters without status to failed and return + if remainingRetries <= 0 { + for _, admissionAttribute := range admissionAttributes { + if IsDefaultDeny(admissionAttribute.result) { + admissionAttribute.result = lastErr + } + } + return + } + + // this retry logic has the same bug that its possible to be checking against quota in a state that never actually exists where + // you've added a new documented, then updated an old one, your resource matches both and you're only checking one + // updates for these quota names failed. Get the current quotas in the namespace, compare by name, check to see if the + // resource versions have changed. If not, we're going to fall through an fail everything. If they all have, then we can try again + newQuotas, err := e.getQuotas(quotas[0].Namespace) + if err != nil { + // this means that updates failed. Anything with a default deny error has failed and we need to let them know + for _, admissionAttribute := range admissionAttributes { + if IsDefaultDeny(admissionAttribute.result) { + admissionAttribute.result = lastErr + } + } + return + } + + // this logic goes through our cache to find the new version of all quotas that failed update. If something has been removed + // it is skipped on this retry. After all, you removed it. + quotasToCheck := []api.ResourceQuota{} + for _, newQuota := range newQuotas { + for _, oldQuota := range updatedFailedQuotas { + if newQuota.Name == oldQuota.Name { + quotasToCheck = append(quotasToCheck, newQuota) + break + } + } + } + e.checkQuotas(quotasToCheck, admissionAttributes, remainingRetries-1) +} + +// checkRequest verifies that the request does not exceed any quota constraint. it returns back a copy of quotas not yet persisted +// that capture what the usage would be if the request succeeded. It return an error if the is insufficient quota to satisfy the request +func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.Attributes) ([]api.ResourceQuota, error) { + namespace := a.GetNamespace() + name := a.GetName() + + evaluators := e.registry.Evaluators() + evaluator, found := evaluators[a.GetKind()] + if !found { + return quotas, nil + } + + op := a.GetOperation() + operationResources := evaluator.OperationResources(op) + if len(operationResources) == 0 { + return quotas, nil + } + + // find the set of quotas that are pertinent to this request + // reject if we match the quota, but usage is not calculated yet + // reject if the input object does not satisfy quota constraints + // if there are no pertinent quotas, we can just return + inputObject := a.GetObject() + interestingQuotaIndexes := []int{} + for i := range quotas { + resourceQuota := quotas[i] + match := evaluator.Matches(&resourceQuota, inputObject) + if !match { + continue + } + + hardResources := quota.ResourceNames(resourceQuota.Status.Hard) + evaluatorResources := evaluator.MatchesResources() + requiredResources := quota.Intersection(hardResources, evaluatorResources) + err := evaluator.Constraints(requiredResources, inputObject) + if err != nil { + return nil, admission.NewForbidden(a, fmt.Errorf("Failed quota: %s: %v", resourceQuota.Name, err)) + } + if !hasUsageStats(&resourceQuota) { + return nil, admission.NewForbidden(a, fmt.Errorf("Status unknown for quota: %s", resourceQuota.Name)) + } + + interestingQuotaIndexes = append(interestingQuotaIndexes, i) + } + if len(interestingQuotaIndexes) == 0 { + return quotas, nil + } + + // Usage of some resources cannot be counted in isolation. For example when + // the resource represents a number of unique references to external + // resource. In such a case an evaluator needs to process other objects in + // the same namespace which needs to be known. + if accessor, err := meta.Accessor(inputObject); namespace != "" && err == nil { + if accessor.GetNamespace() == "" { + accessor.SetNamespace(namespace) + } + } + + // there is at least one quota that definitely matches our object + // as a result, we need to measure the usage of this object for quota + // on updates, we need to subtract the previous measured usage + // if usage shows no change, just return since it has no impact on quota + deltaUsage := evaluator.Usage(inputObject) + if admission.Update == op { + prevItem, err := evaluator.Get(namespace, name) + if err != nil { + return nil, admission.NewForbidden(a, fmt.Errorf("Unable to get previous: %v", err)) + } + prevUsage := evaluator.Usage(prevItem) + deltaUsage = quota.Subtract(deltaUsage, prevUsage) + } + if quota.IsZero(deltaUsage) { + return quotas, nil + } + + for _, index := range interestingQuotaIndexes { + resourceQuota := quotas[index] + + hardResources := quota.ResourceNames(resourceQuota.Status.Hard) + requestedUsage := quota.Mask(deltaUsage, hardResources) + newUsage := quota.Add(resourceQuota.Status.Used, requestedUsage) + if allowed, exceeded := quota.LessThanOrEqual(newUsage, resourceQuota.Status.Hard); !allowed { + failedRequestedUsage := quota.Mask(requestedUsage, exceeded) + failedUsed := quota.Mask(resourceQuota.Status.Used, exceeded) + failedHard := quota.Mask(resourceQuota.Status.Hard, exceeded) + return nil, admission.NewForbidden(a, + fmt.Errorf("Exceeded quota: %s, requested: %s, used: %s, limited: %s", + resourceQuota.Name, + prettyPrint(failedRequestedUsage), + prettyPrint(failedUsed), + prettyPrint(failedHard))) + } + + // update to the new usage number + quotas[index].Status.Used = newUsage + } + + return quotas, nil +} + +func (e *quotaEvaluator) evaluate(a admission.Attributes) error { + waiter := newAdmissionWaiter(a) + + e.addWork(waiter) + + // wait for completion or timeout + select { + case <-waiter.finished: + case <-time.After(10 * time.Second): + return fmt.Errorf("timeout") + } + + return waiter.result +} + +func (e *quotaEvaluator) addWork(a *admissionWaiter) { + e.workLock.Lock() + defer e.workLock.Unlock() + + ns := a.attributes.GetNamespace() + // this Add can trigger a Get BEFORE the work is added to a list, but this is ok because the getWork routine + // waits the worklock before retrieving the work to do, so the writes in this method will be observed + e.queue.Add(ns) + + if e.inProgress.Has(ns) { + e.dirtyWork[ns] = append(e.dirtyWork[ns], a) + return + } + + e.work[ns] = append(e.work[ns], a) +} + +func (e *quotaEvaluator) completeWork(ns string) { + e.workLock.Lock() + defer e.workLock.Unlock() + + e.queue.Done(ns) + e.work[ns] = e.dirtyWork[ns] + delete(e.dirtyWork, ns) + e.inProgress.Delete(ns) +} + +func (e *quotaEvaluator) getWork() (string, []*admissionWaiter) { + uncastNS, _ := e.queue.Get() + ns := uncastNS.(string) + + e.workLock.Lock() + defer e.workLock.Unlock() + // at this point, we know we have a coherent view of e.work. It is entirely possible + // that our workqueue has another item requeued to it, but we'll pick it up early. This ok + // because the next time will go into our dirty list + + work := e.work[ns] + delete(e.work, ns) + delete(e.dirtyWork, ns) + + if len(work) != 0 { + e.inProgress.Insert(ns) + return ns, work + } + + e.queue.Done(ns) + e.inProgress.Delete(ns) + return ns, []*admissionWaiter{} +} + +func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) { + // determine if there are any quotas in this namespace + // if there are no quotas, we don't need to do anything + items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}}) + if err != nil { + return nil, fmt.Errorf("Error resolving quota.") + } + + // if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it. + if len(items) == 0 { + lruItemObj, ok := e.liveLookupCache.Get(namespace) + if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { + // TODO: If there are multiple operations at the same time and cache has just expired, + // this may cause multiple List operations being issued at the same time. + // If there is already in-flight List() for a given namespace, we should wait until + // it is finished and cache is updated instead of doing the same, also to avoid + // throttling - see #22422 for details. + liveList, err := e.client.Core().ResourceQuotas(namespace).List(api.ListOptions{}) + if err != nil { + return nil, err + } + newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)} + for i := range liveList.Items { + newEntry.items = append(newEntry.items, &liveList.Items[i]) + } + e.liveLookupCache.Add(namespace, newEntry) + lruItemObj = newEntry + } + lruEntry := lruItemObj.(liveLookupEntry) + for i := range lruEntry.items { + items = append(items, lruEntry.items[i]) + } + } + + resourceQuotas := []api.ResourceQuota{} + for i := range items { + // always make a copy. We're going to muck around with this and we should never mutate the originals + resourceQuotas = append(resourceQuotas, *items[i].(*api.ResourceQuota)) + } + + return resourceQuotas, nil +} diff --git a/test/integration/quota_test.go b/test/integration/quota_test.go new file mode 100644 index 00000000000..9d4053f5e02 --- /dev/null +++ b/test/integration/quota_test.go @@ -0,0 +1,205 @@ +// +build integration,!no-etcd + +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/controller" + replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" + resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/master" + quotainstall "k8s.io/kubernetes/pkg/quota/install" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/plugin/pkg/admission/resourcequota" + "k8s.io/kubernetes/test/integration/framework" +) + +// 1.2 code gets: +// quota_test.go:95: Took 4.218619579s to scale up without quota +// quota_test.go:199: unexpected error: timed out waiting for the condition, ended with 342 pods (1 minute) +// 1.3+ code gets: +// quota_test.go:100: Took 4.196205966s to scale up without quota +// quota_test.go:115: Took 12.021640372s to scale up with quota +func TestQuota(t *testing.T) { + framework.DeleteAllEtcdKeys() + + // Set up a master + var m *master.Master + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + m.Handler.ServeHTTP(w, req) + })) + // TODO: Uncomment when fix #19254 + // defer s.Close() + clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + admission, err := resourcequota.NewResourceQuota(clientset, quotainstall.NewRegistry(clientset), 5) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + masterConfig := framework.NewIntegrationTestMasterConfig() + masterConfig.AdmissionControl = admission + m, err = master.New(masterConfig) + if err != nil { + t.Fatalf("Error in bringing up the master: %v", err) + } + + go replicationcontroller.NewReplicationManagerFromClient(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096). + Run(3, wait.NeverStop) + + resourceQuotaRegistry := quotainstall.NewRegistry(clientset) + groupKindsToReplenish := []unversioned.GroupKind{ + api.Kind("Pod"), + api.Kind("Service"), + api.Kind("ReplicationController"), + api.Kind("PersistentVolumeClaim"), + api.Kind("Secret"), + } + resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{ + KubeClient: clientset, + ResyncPeriod: controller.NoResyncPeriodFunc, + Registry: resourceQuotaRegistry, + GroupKindsToReplenish: groupKindsToReplenish, + ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(clientset), + } + go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, wait.NeverStop) + + startTime := time.Now() + scale(t, api.NamespaceDefault, clientset) + endTime := time.Now() + t.Logf("Took %v to scale up without quota", endTime.Sub(startTime)) + + quota := &api.ResourceQuota{ + ObjectMeta: api.ObjectMeta{Name: "quota"}, + Spec: api.ResourceQuotaSpec{ + Hard: api.ResourceList{ + api.ResourcePods: resource.MustParse("1000"), + }, + }, + } + waitForQuota(t, quota, clientset) + + startTime = time.Now() + scale(t, "quotaed", clientset) + endTime = time.Now() + t.Logf("Took %v to scale up with quota", endTime.Sub(startTime)) + +} +func waitForQuota(t *testing.T, quota *api.ResourceQuota, clientset *clientset.Clientset) { + w, err := clientset.Core().ResourceQuotas(quota.Namespace).Watch(api.SingleObject(api.ObjectMeta{Name: quota.Name})) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if _, err := clientset.Core().ResourceQuotas("quotaed").Create(quota); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + _, err = watch.Until(1*time.Minute, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Modified: + default: + return false, nil + } + + switch cast := event.Object.(type) { + case *api.ResourceQuota: + if len(cast.Status.Hard) > 0 { + return true, nil + } + } + + return false, nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func scale(t *testing.T, namespace string, clientset *clientset.Clientset) { + target := 1000 + rc := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: namespace, + }, + Spec: api.ReplicationControllerSpec{ + Replicas: target, + Selector: map[string]string{"foo": "bar"}, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container", + Image: "busybox", + }, + }, + }, + }, + }, + } + + w, err := clientset.Core().ReplicationControllers(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: rc.Name})) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if _, err := clientset.Core().ReplicationControllers(namespace).Create(rc); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + _, err = watch.Until(3*time.Minute, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Modified: + default: + return false, nil + } + + switch cast := event.Object.(type) { + case *api.ReplicationController: + if cast.Status.Replicas == target { + return true, nil + } + } + + return false, nil + }) + if err != nil { + pods, _ := clientset.Core().Pods(namespace).List(api.ListOptions{LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}) + t.Fatalf("unexpected error: %v, ended with %v pods", err, len(pods.Items)) + } +}