mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
use single writer to improve quota performance
This commit is contained in:
parent
daf6be1a66
commit
d3c6363093
@ -17,50 +17,31 @@ limitations under the License.
|
||||
package resourcequota
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/golang-lru"
|
||||
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
|
||||
"k8s.io/kubernetes/pkg/admission"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/quota/install"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func init() {
|
||||
admission.RegisterPlugin("ResourceQuota",
|
||||
func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
|
||||
registry := install.NewRegistry(client)
|
||||
return NewResourceQuota(client, registry)
|
||||
return NewResourceQuota(client, registry, 5)
|
||||
})
|
||||
}
|
||||
|
||||
// quotaAdmission implements an admission controller that can enforce quota constraints
|
||||
type quotaAdmission struct {
|
||||
*admission.Handler
|
||||
// must be able to read/write ResourceQuota
|
||||
client clientset.Interface
|
||||
// indexer that holds quota objects by namespace
|
||||
indexer cache.Indexer
|
||||
// registry that knows how to measure usage for objects
|
||||
registry quota.Registry
|
||||
|
||||
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
|
||||
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
|
||||
// We track the lookup result here so that for repeated requests, we don't look it up very often.
|
||||
liveLookupCache *lru.Cache
|
||||
liveTTL time.Duration
|
||||
evaluator *quotaEvaluator
|
||||
}
|
||||
|
||||
type liveLookupEntry struct {
|
||||
@ -71,28 +52,16 @@ type liveLookupEntry struct {
|
||||
// NewResourceQuota configures an admission controller that can enforce quota constraints
|
||||
// using the provided registry. The registry must have the capability to handle group/kinds that
|
||||
// are persisted by the server this admission controller is intercepting
|
||||
func NewResourceQuota(client clientset.Interface, registry quota.Registry) (admission.Interface, error) {
|
||||
liveLookupCache, err := lru.New(100)
|
||||
func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int) (admission.Interface, error) {
|
||||
evaluator, err := newQuotaEvaluator(client, registry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
}
|
||||
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0)
|
||||
reflector.Run()
|
||||
evaluator.Run(numEvaluators)
|
||||
|
||||
return "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: client,
|
||||
indexer: indexer,
|
||||
registry: registry,
|
||||
liveLookupCache: liveLookupCache,
|
||||
liveTTL: time.Duration(30 * time.Second),
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -104,7 +73,7 @@ func (q *quotaAdmission) Admit(a admission.Attributes) (err error) {
|
||||
}
|
||||
|
||||
// if we do not know how to evaluate use for this kind, just ignore
|
||||
evaluators := q.registry.Evaluators()
|
||||
evaluators := q.evaluator.registry.Evaluators()
|
||||
evaluator, found := evaluators[a.GetKind()]
|
||||
if !found {
|
||||
return nil
|
||||
@ -118,183 +87,7 @@ func (q *quotaAdmission) Admit(a admission.Attributes) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// determine if there are any quotas in this namespace
|
||||
// if there are no quotas, we don't need to do anything
|
||||
namespace, name := a.GetNamespace(), a.GetName()
|
||||
items, err := q.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}})
|
||||
if err != nil {
|
||||
return admission.NewForbidden(a, fmt.Errorf("Error resolving quota."))
|
||||
}
|
||||
// if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
|
||||
if len(items) == 0 {
|
||||
lruItemObj, ok := q.liveLookupCache.Get(a.GetNamespace())
|
||||
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
|
||||
// TODO: If there are multiple operations at the same time and cache has just expired,
|
||||
// this may cause multiple List operations being issued at the same time.
|
||||
// If there is already in-flight List() for a given namespace, we should wait until
|
||||
// it is finished and cache is updated instead of doing the same, also to avoid
|
||||
// throttling - see #22422 for details.
|
||||
liveList, err := q.client.Core().ResourceQuotas(namespace).List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return admission.NewForbidden(a, err)
|
||||
}
|
||||
newEntry := liveLookupEntry{expiry: time.Now().Add(q.liveTTL)}
|
||||
for i := range liveList.Items {
|
||||
newEntry.items = append(newEntry.items, &liveList.Items[i])
|
||||
}
|
||||
q.liveLookupCache.Add(a.GetNamespace(), newEntry)
|
||||
lruItemObj = newEntry
|
||||
}
|
||||
lruEntry := lruItemObj.(liveLookupEntry)
|
||||
for i := range lruEntry.items {
|
||||
items = append(items, lruEntry.items[i])
|
||||
}
|
||||
}
|
||||
// if there are still no items, we can return
|
||||
if len(items) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// find the set of quotas that are pertinent to this request
|
||||
// reject if we match the quota, but usage is not calculated yet
|
||||
// reject if the input object does not satisfy quota constraints
|
||||
// if there are no pertinent quotas, we can just return
|
||||
inputObject := a.GetObject()
|
||||
resourceQuotas := []*api.ResourceQuota{}
|
||||
for i := range items {
|
||||
resourceQuota := items[i].(*api.ResourceQuota)
|
||||
match := evaluator.Matches(resourceQuota, inputObject)
|
||||
if !match {
|
||||
continue
|
||||
}
|
||||
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
|
||||
evaluatorResources := evaluator.MatchesResources()
|
||||
requiredResources := quota.Intersection(hardResources, evaluatorResources)
|
||||
err := evaluator.Constraints(requiredResources, inputObject)
|
||||
if err != nil {
|
||||
return admission.NewForbidden(a, fmt.Errorf("Failed quota: %s: %v", resourceQuota.Name, err))
|
||||
}
|
||||
if !hasUsageStats(resourceQuota) {
|
||||
return admission.NewForbidden(a, fmt.Errorf("status unknown for quota: %s", resourceQuota.Name))
|
||||
}
|
||||
resourceQuotas = append(resourceQuotas, resourceQuota)
|
||||
}
|
||||
if len(resourceQuotas) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Usage of some resources cannot be counted in isolation. For example when
|
||||
// the resource represents a number of unique references to external
|
||||
// resource. In such a case an evaluator needs to process other objects in
|
||||
// the same namespace which needs to be known.
|
||||
if accessor, err := meta.Accessor(inputObject); namespace != "" && err == nil {
|
||||
if accessor.GetNamespace() == "" {
|
||||
accessor.SetNamespace(namespace)
|
||||
}
|
||||
}
|
||||
|
||||
// there is at least one quota that definitely matches our object
|
||||
// as a result, we need to measure the usage of this object for quota
|
||||
// on updates, we need to subtract the previous measured usage
|
||||
// if usage shows no change, just return since it has no impact on quota
|
||||
deltaUsage := evaluator.Usage(inputObject)
|
||||
if admission.Update == op {
|
||||
prevItem, err := evaluator.Get(namespace, name)
|
||||
if err != nil {
|
||||
return admission.NewForbidden(a, fmt.Errorf("Unable to get previous: %v", err))
|
||||
}
|
||||
prevUsage := evaluator.Usage(prevItem)
|
||||
deltaUsage = quota.Subtract(deltaUsage, prevUsage)
|
||||
}
|
||||
if quota.IsZero(deltaUsage) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Move to a bucketing work queue
|
||||
// If we guaranteed that we processed the request in order it was received to server, we would reduce quota conflicts.
|
||||
// Until we have the bucketing work queue, we jitter requests and retry on conflict.
|
||||
numRetries := 10
|
||||
interval := time.Duration(rand.Int63n(90)+int64(10)) * time.Millisecond
|
||||
|
||||
// seed the retry loop with the initial set of quotas to process (should reduce each iteration)
|
||||
resourceQuotasToProcess := resourceQuotas
|
||||
for retry := 1; retry <= numRetries; retry++ {
|
||||
// the list of quotas we will try again if there is a version conflict
|
||||
tryAgain := []*api.ResourceQuota{}
|
||||
|
||||
// check that we pass all remaining quotas so we do not prematurely charge
|
||||
// for each quota, mask the usage to the set of resources tracked by the quota
|
||||
// if request + used > hard, return an error describing the failure
|
||||
updatedUsage := map[string]api.ResourceList{}
|
||||
for _, resourceQuota := range resourceQuotasToProcess {
|
||||
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
|
||||
requestedUsage := quota.Mask(deltaUsage, hardResources)
|
||||
newUsage := quota.Add(resourceQuota.Status.Used, requestedUsage)
|
||||
if allowed, exceeded := quota.LessThanOrEqual(newUsage, resourceQuota.Status.Hard); !allowed {
|
||||
failedRequestedUsage := quota.Mask(requestedUsage, exceeded)
|
||||
failedUsed := quota.Mask(resourceQuota.Status.Used, exceeded)
|
||||
failedHard := quota.Mask(resourceQuota.Status.Hard, exceeded)
|
||||
return admission.NewForbidden(a,
|
||||
fmt.Errorf("exceeded quota: %s, requested: %s, used: %s, limited: %s",
|
||||
resourceQuota.Name,
|
||||
prettyPrint(failedRequestedUsage),
|
||||
prettyPrint(failedUsed),
|
||||
prettyPrint(failedHard)))
|
||||
}
|
||||
updatedUsage[resourceQuota.Name] = newUsage
|
||||
}
|
||||
|
||||
// update the status for each quota with its new usage
|
||||
// if we get a conflict, get updated quota, and enqueue
|
||||
for i, resourceQuota := range resourceQuotasToProcess {
|
||||
newUsage := updatedUsage[resourceQuota.Name]
|
||||
quotaToUpdate := &api.ResourceQuota{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: resourceQuota.Name,
|
||||
Namespace: resourceQuota.Namespace,
|
||||
ResourceVersion: resourceQuota.ResourceVersion,
|
||||
},
|
||||
Status: api.ResourceQuotaStatus{
|
||||
Hard: quota.Add(api.ResourceList{}, resourceQuota.Status.Hard),
|
||||
Used: newUsage,
|
||||
},
|
||||
}
|
||||
_, err = q.client.Core().ResourceQuotas(quotaToUpdate.Namespace).UpdateStatus(quotaToUpdate)
|
||||
if err != nil {
|
||||
if !errors.IsConflict(err) {
|
||||
return admission.NewForbidden(a, fmt.Errorf("Unable to update quota status: %s %v", resourceQuota.Name, err))
|
||||
}
|
||||
// if we get a conflict, we get the latest copy of the quota documents that were not yet modified so we retry all with latest state.
|
||||
for fetchIndex := i; fetchIndex < len(resourceQuotasToProcess); fetchIndex++ {
|
||||
latestQuota, err := q.client.Core().ResourceQuotas(namespace).Get(resourceQuotasToProcess[fetchIndex].Name)
|
||||
if err != nil {
|
||||
return admission.NewForbidden(a, fmt.Errorf("Unable to get quota: %s %v", resourceQuotasToProcess[fetchIndex].Name, err))
|
||||
}
|
||||
tryAgain = append(tryAgain, latestQuota)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// all quotas were updated, so we can return
|
||||
if len(tryAgain) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// we have concurrent requests to update quota, so look to retry if needed
|
||||
// next iteration, we need to process the items that have to try again
|
||||
// pause the specified interval to encourage jitter
|
||||
if retry == numRetries {
|
||||
names := []string{}
|
||||
for _, quota := range tryAgain {
|
||||
names = append(names, quota.Name)
|
||||
}
|
||||
return admission.NewForbidden(a, fmt.Errorf("Unable to update status for quota: %s, ", strings.Join(names, ",")))
|
||||
}
|
||||
resourceQuotasToProcess = tryAgain
|
||||
time.Sleep(interval)
|
||||
}
|
||||
return nil
|
||||
return q.evaluator.evaluate(a)
|
||||
}
|
||||
|
||||
// prettyPrint formats a resource list for usage in errors
|
||||
|
@ -75,7 +75,7 @@ func validPod(name string, numContainers int, resources api.ResourceRequirements
|
||||
// TestAdmissionIgnoresDelete verifies that the admission controller ignores delete operations
|
||||
func TestAdmissionIgnoresDelete(t *testing.T) {
|
||||
kubeClient := fake.NewSimpleClientset()
|
||||
handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient))
|
||||
handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient), 5)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
@ -101,13 +101,14 @@ func TestAdmissionIgnoresSubresources(t *testing.T) {
|
||||
resourceQuota.Status.Used[api.ResourceMemory] = resource.MustParse("1Gi")
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: install.NewRegistry(kubeClient),
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
handler.indexer.Add(resourceQuota)
|
||||
indexer.Add(resourceQuota)
|
||||
newPod := validPod("123", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", "")))
|
||||
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
|
||||
if err == nil {
|
||||
@ -138,13 +139,14 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: install.NewRegistry(kubeClient),
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
handler.indexer.Add(resourceQuota)
|
||||
indexer.Add(resourceQuota)
|
||||
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", "")))
|
||||
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
|
||||
if err != nil {
|
||||
@ -165,8 +167,9 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
|
||||
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
|
||||
}
|
||||
|
||||
lastActionIndex := len(kubeClient.Actions()) - 1
|
||||
usage := kubeClient.Actions()[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
|
||||
decimatedActions := removeListWatch(kubeClient.Actions())
|
||||
lastActionIndex := len(decimatedActions) - 1
|
||||
usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
|
||||
expectedUsage := api.ResourceQuota{
|
||||
Status: api.ResourceQuotaStatus{
|
||||
Hard: api.ResourceList{
|
||||
@ -210,13 +213,14 @@ func TestAdmitExceedQuotaLimit(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: install.NewRegistry(kubeClient),
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
handler.indexer.Add(resourceQuota)
|
||||
indexer.Add(resourceQuota)
|
||||
newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", "")))
|
||||
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
|
||||
if err == nil {
|
||||
@ -247,13 +251,14 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: install.NewRegistry(kubeClient),
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
handler.indexer.Add(resourceQuota)
|
||||
indexer.Add(resourceQuota)
|
||||
newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("200m", "")))
|
||||
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
|
||||
if err == nil {
|
||||
@ -286,15 +291,16 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.liveLookupCache = liveLookupCache
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: install.NewRegistry(kubeClient),
|
||||
liveLookupCache: liveLookupCache,
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
// Add to the index
|
||||
handler.indexer.Add(resourceQuota)
|
||||
indexer.Add(resourceQuota)
|
||||
newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("200m", "")))
|
||||
// Add to the lru cache so we do not do a live client lookup
|
||||
liveLookupCache.Add(newPod.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(30 * time.Second)), items: []*api.ResourceQuota{}})
|
||||
@ -346,14 +352,15 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: install.NewRegistry(kubeClient),
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
handler.indexer.Add(resourceQuotaNonTerminating)
|
||||
handler.indexer.Add(resourceQuotaTerminating)
|
||||
indexer.Add(resourceQuotaNonTerminating)
|
||||
indexer.Add(resourceQuotaTerminating)
|
||||
|
||||
// create a pod that has an active deadline
|
||||
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", "")))
|
||||
@ -378,8 +385,9 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
|
||||
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
|
||||
}
|
||||
|
||||
lastActionIndex := len(kubeClient.Actions()) - 1
|
||||
usage := kubeClient.Actions()[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
|
||||
decimatedActions := removeListWatch(kubeClient.Actions())
|
||||
lastActionIndex := len(decimatedActions) - 1
|
||||
usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
|
||||
|
||||
// ensure only the quota-terminating was updated
|
||||
if usage.Name != resourceQuotaTerminating.Name {
|
||||
@ -443,14 +451,15 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuotaBestEffort, resourceQuotaNotBestEffort)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: install.NewRegistry(kubeClient),
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
handler.indexer.Add(resourceQuotaBestEffort)
|
||||
handler.indexer.Add(resourceQuotaNotBestEffort)
|
||||
indexer.Add(resourceQuotaBestEffort)
|
||||
indexer.Add(resourceQuotaNotBestEffort)
|
||||
|
||||
// create a pod that is best effort because it does not make a request for anything
|
||||
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", "")))
|
||||
@ -468,8 +477,9 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
|
||||
if !actionSet.HasAll(expectedActionSet.List()...) {
|
||||
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
|
||||
}
|
||||
lastActionIndex := len(kubeClient.Actions()) - 1
|
||||
usage := kubeClient.Actions()[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
|
||||
decimatedActions := removeListWatch(kubeClient.Actions())
|
||||
lastActionIndex := len(decimatedActions) - 1
|
||||
usage := decimatedActions[lastActionIndex].(testcore.UpdateAction).GetObject().(*api.ResourceQuota)
|
||||
|
||||
if usage.Name != resourceQuotaBestEffort.Name {
|
||||
t.Errorf("Incremented the wrong quota, expected %v, actual %v", resourceQuotaBestEffort.Name, usage.Name)
|
||||
@ -495,6 +505,19 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func removeListWatch(in []testcore.Action) []testcore.Action {
|
||||
decimatedActions := []testcore.Action{}
|
||||
// list and watch resource quota is done to maintain our cache, so that's expected. Remove them from results
|
||||
for i := range in {
|
||||
if in[i].Matches("list", "resourcequotas") || in[i].Matches("watch", "resourcequotas") {
|
||||
continue
|
||||
}
|
||||
|
||||
decimatedActions = append(decimatedActions, in[i])
|
||||
}
|
||||
return decimatedActions
|
||||
}
|
||||
|
||||
// TestAdmitBestEffortQuotaLimitIgnoresBurstable validates that a besteffort quota does not match a resource
|
||||
// guaranteed pod.
|
||||
func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
|
||||
@ -514,20 +537,23 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: install.NewRegistry(kubeClient),
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
handler.indexer.Add(resourceQuota)
|
||||
indexer.Add(resourceQuota)
|
||||
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", "")))
|
||||
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if len(kubeClient.Actions()) != 0 {
|
||||
t.Errorf("Expected no client actions because the incoming pod did not match best effort quota")
|
||||
|
||||
decimatedActions := removeListWatch(kubeClient.Actions())
|
||||
if len(decimatedActions) != 0 {
|
||||
t.Errorf("Expected no client actions because the incoming pod did not match best effort quota: %v", kubeClient.Actions())
|
||||
}
|
||||
}
|
||||
|
||||
@ -623,13 +649,15 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) {
|
||||
podEvaluator.GroupKind(): podEvaluator,
|
||||
},
|
||||
}
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.registry = registry
|
||||
evaluator.Run(5)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
client: kubeClient,
|
||||
indexer: indexer,
|
||||
registry: registry,
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
}
|
||||
handler.indexer.Add(resourceQuota)
|
||||
indexer.Add(resourceQuota)
|
||||
newPod := validPod("pod-without-namespace", 1, getResourceRequirements(getResourceList("1", "2Gi"), getResourceList("", "")))
|
||||
|
||||
// unset the namespace
|
||||
|
502
plugin/pkg/admission/resourcequota/controller.go
Normal file
502
plugin/pkg/admission/resourcequota/controller.go
Normal file
@ -0,0 +1,502 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package resourcequota
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/golang-lru"
|
||||
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
|
||||
"k8s.io/kubernetes/pkg/admission"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
type quotaEvaluator struct {
|
||||
client clientset.Interface
|
||||
|
||||
// indexer that holds quota objects by namespace
|
||||
indexer cache.Indexer
|
||||
// registry that knows how to measure usage for objects
|
||||
registry quota.Registry
|
||||
|
||||
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
|
||||
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
|
||||
// We track the lookup result here so that for repeated requests, we don't look it up very often.
|
||||
liveLookupCache *lru.Cache
|
||||
liveTTL time.Duration
|
||||
|
||||
// TODO these are used together to bucket items by namespace and then batch them up for processing.
|
||||
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
|
||||
// We could move this into a library if another component needed it.
|
||||
// queue is indexed by namespace, so that we bundle up on a per-namespace basis
|
||||
queue *workqueue.Type
|
||||
workLock sync.Mutex
|
||||
work map[string][]*admissionWaiter
|
||||
dirtyWork map[string][]*admissionWaiter
|
||||
inProgress sets.String
|
||||
}
|
||||
|
||||
type admissionWaiter struct {
|
||||
attributes admission.Attributes
|
||||
finished chan struct{}
|
||||
result error
|
||||
}
|
||||
|
||||
type defaultDeny struct{}
|
||||
|
||||
func (defaultDeny) Error() string {
|
||||
return "DEFAULT DENY"
|
||||
}
|
||||
|
||||
func IsDefaultDeny(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, ok := err.(defaultDeny)
|
||||
return ok
|
||||
}
|
||||
|
||||
func newAdmissionWaiter(a admission.Attributes) *admissionWaiter {
|
||||
return &admissionWaiter{
|
||||
attributes: a,
|
||||
finished: make(chan struct{}),
|
||||
result: defaultDeny{},
|
||||
}
|
||||
}
|
||||
|
||||
// newQuotaEvaluator configures an admission controller that can enforce quota constraints
|
||||
// using the provided registry. The registry must have the capability to handle group/kinds that
|
||||
// are persisted by the server this admission controller is intercepting
|
||||
func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*quotaEvaluator, error) {
|
||||
liveLookupCache, err := lru.New(100)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
}
|
||||
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0)
|
||||
|
||||
reflector.Run()
|
||||
return "aEvaluator{
|
||||
client: client,
|
||||
indexer: indexer,
|
||||
registry: registry,
|
||||
liveLookupCache: liveLookupCache,
|
||||
liveTTL: time.Duration(30 * time.Second),
|
||||
|
||||
queue: workqueue.New(),
|
||||
work: map[string][]*admissionWaiter{},
|
||||
dirtyWork: map[string][]*admissionWaiter{},
|
||||
inProgress: sets.String{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run begins watching and syncing.
|
||||
func (e *quotaEvaluator) Run(workers int) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(e.doWork, time.Second, make(chan struct{}))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) doWork() {
|
||||
for {
|
||||
func() {
|
||||
ns, admissionAttributes := e.getWork()
|
||||
defer e.completeWork(ns)
|
||||
if len(admissionAttributes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
e.checkAttributes(ns, admissionAttributes)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// checkAttributes iterates evaluates all the waiting admissionAttributes. It will always notify all waiters
|
||||
// before returning. The default is to deny.
|
||||
func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admissionWaiter) {
|
||||
// notify all on exit
|
||||
defer func() {
|
||||
for _, admissionAttribute := range admissionAttributes {
|
||||
close(admissionAttribute.finished)
|
||||
}
|
||||
}()
|
||||
|
||||
quotas, err := e.getQuotas(ns)
|
||||
if err != nil {
|
||||
for _, admissionAttribute := range admissionAttributes {
|
||||
admissionAttribute.result = err
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(quotas) == 0 {
|
||||
for _, admissionAttribute := range admissionAttributes {
|
||||
admissionAttribute.result = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
e.checkQuotas(quotas, admissionAttributes, 3)
|
||||
}
|
||||
|
||||
// checkQuotas checks the admission atttributes against the passed quotas. If a quota applies, it will attempt to update it
|
||||
// AFTER it has checked all the admissionAttributes. The method breaks down into phase like this:
|
||||
// 0. make a copy of the quotas to act as a "running" quota so we know what we need to update and can still compare against the
|
||||
// originals
|
||||
// 1. check each admission attribute to see if it fits within *all* the quotas. If it doesn't fit, mark the waiter as failed
|
||||
// and the running quota don't change. If it did fit, check to see if any quota was changed. It there was no quota change
|
||||
// mark the waiter as succeeded. If some quota did change, update the running quotas
|
||||
// 2. If no running quota was changed, return now since no updates are needed.
|
||||
// 3. for each quota that has changed, attempt an update. If all updates succeeded, update all unset waiters to success status and return. If the some
|
||||
// updates failed on conflict errors and we have retries left, re-get the failed quota from our cache for the latest version
|
||||
// and recurse into this method with the subset. It's safe for us to evaluate ONLY the subset, because the other quota
|
||||
// documents for these waiters have already been evaluated. Step 1, will mark all the ones that should already have succeeded.
|
||||
func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttributes []*admissionWaiter, remainingRetries int) {
|
||||
// yet another copy to compare against originals to see if we actually have deltas
|
||||
originalQuotas := make([]api.ResourceQuota, len(quotas), len(quotas))
|
||||
copy(originalQuotas, quotas)
|
||||
|
||||
atLeastOneChanged := false
|
||||
for i := range admissionAttributes {
|
||||
admissionAttribute := admissionAttributes[i]
|
||||
newQuotas, err := e.checkRequest(quotas, admissionAttribute.attributes)
|
||||
if err != nil {
|
||||
admissionAttribute.result = err
|
||||
continue
|
||||
}
|
||||
|
||||
// if the new quotas are the same as the old quotas, then this particular one doesn't issue any updates
|
||||
// that means that no quota docs applied, so it can get a pass
|
||||
atLeastOneChangeForThisWaiter := false
|
||||
for j := range newQuotas {
|
||||
if !quota.Equals(originalQuotas[j].Status.Used, newQuotas[j].Status.Used) {
|
||||
atLeastOneChanged = true
|
||||
atLeastOneChangeForThisWaiter = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !atLeastOneChangeForThisWaiter {
|
||||
admissionAttribute.result = nil
|
||||
}
|
||||
quotas = newQuotas
|
||||
}
|
||||
|
||||
// if none of the requests changed anything, there's no reason to issue an update, just fail them all now
|
||||
if !atLeastOneChanged {
|
||||
return
|
||||
}
|
||||
|
||||
// now go through and try to issue updates. Things get a little weird here:
|
||||
// 1. check to see if the quota changed. If not, skip.
|
||||
// 2. if the quota changed and the update passes, be happy
|
||||
// 3. if the quota changed and the update fails, add the original to a retry list
|
||||
var updatedFailedQuotas []api.ResourceQuota
|
||||
var lastErr error
|
||||
for i := range quotas {
|
||||
newQuota := quotas[i]
|
||||
// if this quota didn't have its status changed, skip it
|
||||
if quota.Equals(originalQuotas[i].Status.Used, newQuota.Status.Used) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
|
||||
updatedFailedQuotas = append(updatedFailedQuotas, newQuota)
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
|
||||
if len(updatedFailedQuotas) == 0 {
|
||||
// all the updates succeeded. At this point, anything with the default deny error was just waiting to
|
||||
// get a successful update, so we can mark and notify
|
||||
for _, admissionAttribute := range admissionAttributes {
|
||||
if IsDefaultDeny(admissionAttribute.result) {
|
||||
admissionAttribute.result = nil
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// at this point, errors are fatal. Update all waiters without status to failed and return
|
||||
if remainingRetries <= 0 {
|
||||
for _, admissionAttribute := range admissionAttributes {
|
||||
if IsDefaultDeny(admissionAttribute.result) {
|
||||
admissionAttribute.result = lastErr
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// this retry logic has the same bug that its possible to be checking against quota in a state that never actually exists where
|
||||
// you've added a new documented, then updated an old one, your resource matches both and you're only checking one
|
||||
// updates for these quota names failed. Get the current quotas in the namespace, compare by name, check to see if the
|
||||
// resource versions have changed. If not, we're going to fall through an fail everything. If they all have, then we can try again
|
||||
newQuotas, err := e.getQuotas(quotas[0].Namespace)
|
||||
if err != nil {
|
||||
// this means that updates failed. Anything with a default deny error has failed and we need to let them know
|
||||
for _, admissionAttribute := range admissionAttributes {
|
||||
if IsDefaultDeny(admissionAttribute.result) {
|
||||
admissionAttribute.result = lastErr
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// this logic goes through our cache to find the new version of all quotas that failed update. If something has been removed
|
||||
// it is skipped on this retry. After all, you removed it.
|
||||
quotasToCheck := []api.ResourceQuota{}
|
||||
for _, newQuota := range newQuotas {
|
||||
for _, oldQuota := range updatedFailedQuotas {
|
||||
if newQuota.Name == oldQuota.Name {
|
||||
quotasToCheck = append(quotasToCheck, newQuota)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
e.checkQuotas(quotasToCheck, admissionAttributes, remainingRetries-1)
|
||||
}
|
||||
|
||||
// checkRequest verifies that the request does not exceed any quota constraint. it returns back a copy of quotas not yet persisted
|
||||
// that capture what the usage would be if the request succeeded. It return an error if the is insufficient quota to satisfy the request
|
||||
func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.Attributes) ([]api.ResourceQuota, error) {
|
||||
namespace := a.GetNamespace()
|
||||
name := a.GetName()
|
||||
|
||||
evaluators := e.registry.Evaluators()
|
||||
evaluator, found := evaluators[a.GetKind()]
|
||||
if !found {
|
||||
return quotas, nil
|
||||
}
|
||||
|
||||
op := a.GetOperation()
|
||||
operationResources := evaluator.OperationResources(op)
|
||||
if len(operationResources) == 0 {
|
||||
return quotas, nil
|
||||
}
|
||||
|
||||
// find the set of quotas that are pertinent to this request
|
||||
// reject if we match the quota, but usage is not calculated yet
|
||||
// reject if the input object does not satisfy quota constraints
|
||||
// if there are no pertinent quotas, we can just return
|
||||
inputObject := a.GetObject()
|
||||
interestingQuotaIndexes := []int{}
|
||||
for i := range quotas {
|
||||
resourceQuota := quotas[i]
|
||||
match := evaluator.Matches(&resourceQuota, inputObject)
|
||||
if !match {
|
||||
continue
|
||||
}
|
||||
|
||||
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
|
||||
evaluatorResources := evaluator.MatchesResources()
|
||||
requiredResources := quota.Intersection(hardResources, evaluatorResources)
|
||||
err := evaluator.Constraints(requiredResources, inputObject)
|
||||
if err != nil {
|
||||
return nil, admission.NewForbidden(a, fmt.Errorf("Failed quota: %s: %v", resourceQuota.Name, err))
|
||||
}
|
||||
if !hasUsageStats(&resourceQuota) {
|
||||
return nil, admission.NewForbidden(a, fmt.Errorf("Status unknown for quota: %s", resourceQuota.Name))
|
||||
}
|
||||
|
||||
interestingQuotaIndexes = append(interestingQuotaIndexes, i)
|
||||
}
|
||||
if len(interestingQuotaIndexes) == 0 {
|
||||
return quotas, nil
|
||||
}
|
||||
|
||||
// Usage of some resources cannot be counted in isolation. For example when
|
||||
// the resource represents a number of unique references to external
|
||||
// resource. In such a case an evaluator needs to process other objects in
|
||||
// the same namespace which needs to be known.
|
||||
if accessor, err := meta.Accessor(inputObject); namespace != "" && err == nil {
|
||||
if accessor.GetNamespace() == "" {
|
||||
accessor.SetNamespace(namespace)
|
||||
}
|
||||
}
|
||||
|
||||
// there is at least one quota that definitely matches our object
|
||||
// as a result, we need to measure the usage of this object for quota
|
||||
// on updates, we need to subtract the previous measured usage
|
||||
// if usage shows no change, just return since it has no impact on quota
|
||||
deltaUsage := evaluator.Usage(inputObject)
|
||||
if admission.Update == op {
|
||||
prevItem, err := evaluator.Get(namespace, name)
|
||||
if err != nil {
|
||||
return nil, admission.NewForbidden(a, fmt.Errorf("Unable to get previous: %v", err))
|
||||
}
|
||||
prevUsage := evaluator.Usage(prevItem)
|
||||
deltaUsage = quota.Subtract(deltaUsage, prevUsage)
|
||||
}
|
||||
if quota.IsZero(deltaUsage) {
|
||||
return quotas, nil
|
||||
}
|
||||
|
||||
for _, index := range interestingQuotaIndexes {
|
||||
resourceQuota := quotas[index]
|
||||
|
||||
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
|
||||
requestedUsage := quota.Mask(deltaUsage, hardResources)
|
||||
newUsage := quota.Add(resourceQuota.Status.Used, requestedUsage)
|
||||
if allowed, exceeded := quota.LessThanOrEqual(newUsage, resourceQuota.Status.Hard); !allowed {
|
||||
failedRequestedUsage := quota.Mask(requestedUsage, exceeded)
|
||||
failedUsed := quota.Mask(resourceQuota.Status.Used, exceeded)
|
||||
failedHard := quota.Mask(resourceQuota.Status.Hard, exceeded)
|
||||
return nil, admission.NewForbidden(a,
|
||||
fmt.Errorf("Exceeded quota: %s, requested: %s, used: %s, limited: %s",
|
||||
resourceQuota.Name,
|
||||
prettyPrint(failedRequestedUsage),
|
||||
prettyPrint(failedUsed),
|
||||
prettyPrint(failedHard)))
|
||||
}
|
||||
|
||||
// update to the new usage number
|
||||
quotas[index].Status.Used = newUsage
|
||||
}
|
||||
|
||||
return quotas, nil
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) evaluate(a admission.Attributes) error {
|
||||
waiter := newAdmissionWaiter(a)
|
||||
|
||||
e.addWork(waiter)
|
||||
|
||||
// wait for completion or timeout
|
||||
select {
|
||||
case <-waiter.finished:
|
||||
case <-time.After(10 * time.Second):
|
||||
return fmt.Errorf("timeout")
|
||||
}
|
||||
|
||||
return waiter.result
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) addWork(a *admissionWaiter) {
|
||||
e.workLock.Lock()
|
||||
defer e.workLock.Unlock()
|
||||
|
||||
ns := a.attributes.GetNamespace()
|
||||
// this Add can trigger a Get BEFORE the work is added to a list, but this is ok because the getWork routine
|
||||
// waits the worklock before retrieving the work to do, so the writes in this method will be observed
|
||||
e.queue.Add(ns)
|
||||
|
||||
if e.inProgress.Has(ns) {
|
||||
e.dirtyWork[ns] = append(e.dirtyWork[ns], a)
|
||||
return
|
||||
}
|
||||
|
||||
e.work[ns] = append(e.work[ns], a)
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) completeWork(ns string) {
|
||||
e.workLock.Lock()
|
||||
defer e.workLock.Unlock()
|
||||
|
||||
e.queue.Done(ns)
|
||||
e.work[ns] = e.dirtyWork[ns]
|
||||
delete(e.dirtyWork, ns)
|
||||
e.inProgress.Delete(ns)
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) getWork() (string, []*admissionWaiter) {
|
||||
uncastNS, _ := e.queue.Get()
|
||||
ns := uncastNS.(string)
|
||||
|
||||
e.workLock.Lock()
|
||||
defer e.workLock.Unlock()
|
||||
// at this point, we know we have a coherent view of e.work. It is entirely possible
|
||||
// that our workqueue has another item requeued to it, but we'll pick it up early. This ok
|
||||
// because the next time will go into our dirty list
|
||||
|
||||
work := e.work[ns]
|
||||
delete(e.work, ns)
|
||||
delete(e.dirtyWork, ns)
|
||||
|
||||
if len(work) != 0 {
|
||||
e.inProgress.Insert(ns)
|
||||
return ns, work
|
||||
}
|
||||
|
||||
e.queue.Done(ns)
|
||||
e.inProgress.Delete(ns)
|
||||
return ns, []*admissionWaiter{}
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) {
|
||||
// determine if there are any quotas in this namespace
|
||||
// if there are no quotas, we don't need to do anything
|
||||
items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error resolving quota.")
|
||||
}
|
||||
|
||||
// if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
|
||||
if len(items) == 0 {
|
||||
lruItemObj, ok := e.liveLookupCache.Get(namespace)
|
||||
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
|
||||
// TODO: If there are multiple operations at the same time and cache has just expired,
|
||||
// this may cause multiple List operations being issued at the same time.
|
||||
// If there is already in-flight List() for a given namespace, we should wait until
|
||||
// it is finished and cache is updated instead of doing the same, also to avoid
|
||||
// throttling - see #22422 for details.
|
||||
liveList, err := e.client.Core().ResourceQuotas(namespace).List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
|
||||
for i := range liveList.Items {
|
||||
newEntry.items = append(newEntry.items, &liveList.Items[i])
|
||||
}
|
||||
e.liveLookupCache.Add(namespace, newEntry)
|
||||
lruItemObj = newEntry
|
||||
}
|
||||
lruEntry := lruItemObj.(liveLookupEntry)
|
||||
for i := range lruEntry.items {
|
||||
items = append(items, lruEntry.items[i])
|
||||
}
|
||||
}
|
||||
|
||||
resourceQuotas := []api.ResourceQuota{}
|
||||
for i := range items {
|
||||
// always make a copy. We're going to muck around with this and we should never mutate the originals
|
||||
resourceQuotas = append(resourceQuotas, *items[i].(*api.ResourceQuota))
|
||||
}
|
||||
|
||||
return resourceQuotas, nil
|
||||
}
|
205
test/integration/quota_test.go
Normal file
205
test/integration/quota_test.go
Normal file
@ -0,0 +1,205 @@
|
||||
// +build integration,!no-etcd
|
||||
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
||||
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
quotainstall "k8s.io/kubernetes/pkg/quota/install"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/resourcequota"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
// 1.2 code gets:
|
||||
// quota_test.go:95: Took 4.218619579s to scale up without quota
|
||||
// quota_test.go:199: unexpected error: timed out waiting for the condition, ended with 342 pods (1 minute)
|
||||
// 1.3+ code gets:
|
||||
// quota_test.go:100: Took 4.196205966s to scale up without quota
|
||||
// quota_test.go:115: Took 12.021640372s to scale up with quota
|
||||
func TestQuota(t *testing.T) {
|
||||
framework.DeleteAllEtcdKeys()
|
||||
|
||||
// Set up a master
|
||||
var m *master.Master
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
m.Handler.ServeHTTP(w, req)
|
||||
}))
|
||||
// TODO: Uncomment when fix #19254
|
||||
// defer s.Close()
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
admission, err := resourcequota.NewResourceQuota(clientset, quotainstall.NewRegistry(clientset), 5)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
||||
masterConfig.AdmissionControl = admission
|
||||
m, err = master.New(masterConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Error in bringing up the master: %v", err)
|
||||
}
|
||||
|
||||
go replicationcontroller.NewReplicationManagerFromClient(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
|
||||
Run(3, wait.NeverStop)
|
||||
|
||||
resourceQuotaRegistry := quotainstall.NewRegistry(clientset)
|
||||
groupKindsToReplenish := []unversioned.GroupKind{
|
||||
api.Kind("Pod"),
|
||||
api.Kind("Service"),
|
||||
api.Kind("ReplicationController"),
|
||||
api.Kind("PersistentVolumeClaim"),
|
||||
api.Kind("Secret"),
|
||||
}
|
||||
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
|
||||
KubeClient: clientset,
|
||||
ResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
Registry: resourceQuotaRegistry,
|
||||
GroupKindsToReplenish: groupKindsToReplenish,
|
||||
ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
|
||||
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(clientset),
|
||||
}
|
||||
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(2, wait.NeverStop)
|
||||
|
||||
startTime := time.Now()
|
||||
scale(t, api.NamespaceDefault, clientset)
|
||||
endTime := time.Now()
|
||||
t.Logf("Took %v to scale up without quota", endTime.Sub(startTime))
|
||||
|
||||
quota := &api.ResourceQuota{
|
||||
ObjectMeta: api.ObjectMeta{Name: "quota"},
|
||||
Spec: api.ResourceQuotaSpec{
|
||||
Hard: api.ResourceList{
|
||||
api.ResourcePods: resource.MustParse("1000"),
|
||||
},
|
||||
},
|
||||
}
|
||||
waitForQuota(t, quota, clientset)
|
||||
|
||||
startTime = time.Now()
|
||||
scale(t, "quotaed", clientset)
|
||||
endTime = time.Now()
|
||||
t.Logf("Took %v to scale up with quota", endTime.Sub(startTime))
|
||||
|
||||
}
|
||||
func waitForQuota(t *testing.T, quota *api.ResourceQuota, clientset *clientset.Clientset) {
|
||||
w, err := clientset.Core().ResourceQuotas(quota.Namespace).Watch(api.SingleObject(api.ObjectMeta{Name: quota.Name}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if _, err := clientset.Core().ResourceQuotas("quotaed").Create(quota); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = watch.Until(1*time.Minute, w, func(event watch.Event) (bool, error) {
|
||||
switch event.Type {
|
||||
case watch.Modified:
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
|
||||
switch cast := event.Object.(type) {
|
||||
case *api.ResourceQuota:
|
||||
if len(cast.Status.Hard) > 0 {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func scale(t *testing.T, namespace string, clientset *clientset.Clientset) {
|
||||
target := 1000
|
||||
rc := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Replicas: target,
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "container",
|
||||
Image: "busybox",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
w, err := clientset.Core().ReplicationControllers(namespace).Watch(api.SingleObject(api.ObjectMeta{Name: rc.Name}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if _, err := clientset.Core().ReplicationControllers(namespace).Create(rc); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = watch.Until(3*time.Minute, w, func(event watch.Event) (bool, error) {
|
||||
switch event.Type {
|
||||
case watch.Modified:
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
|
||||
switch cast := event.Object.(type) {
|
||||
case *api.ReplicationController:
|
||||
if cast.Status.Replicas == target {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
pods, _ := clientset.Core().Pods(namespace).List(api.ListOptions{LabelSelector: labels.Everything(), FieldSelector: fields.Everything()})
|
||||
t.Fatalf("unexpected error: %v, ended with %v pods", err, len(pods.Items))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user