From e13ff5ece2691988503d183b495cc7c39e899647 Mon Sep 17 00:00:00 2001 From: Flavian Missi Date: Wed, 3 Apr 2024 09:58:27 +0200 Subject: [PATCH] resourcequota: use singleflight.Group to reduce apiserver load relates to #22422 and #123806 --- .../plugin/resourcequota/resource_access.go | 28 ++-- .../resourcequota/resource_access_test.go | 123 +++++++++++++++++- 2 files changed, 138 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access.go index f09b46268c8..d189446f032 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "golang.org/x/sync/singleflight" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -51,6 +52,7 @@ type quotaAccessor struct { // This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. // We track the lookup result here so that for repeated requests, we don't look it up very often. liveLookupCache *lru.Cache + group singleflight.Group liveTTL time.Duration // updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to // back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions @@ -114,21 +116,23 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, err if len(items) == 0 { lruItemObj, ok := e.liveLookupCache.Get(namespace) if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { - // TODO: If there are multiple operations at the same time and cache has just expired, - // this may cause multiple List operations being issued at the same time. - // If there is already in-flight List() for a given namespace, we should wait until - // it is finished and cache is updated instead of doing the same, also to avoid - // throttling - see #22422 for details. - liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{}) + // use singleflight.Group to avoid flooding the apiserver with repeated + // requests. See #22422 for details. + lruItemObj, err, _ = e.group.Do(namespace, func() (interface{}, error) { + liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)} + for i := range liveList.Items { + newEntry.items = append(newEntry.items, &liveList.Items[i]) + } + e.liveLookupCache.Add(namespace, newEntry) + return newEntry, nil + }) if err != nil { return nil, err } - newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)} - for i := range liveList.Items { - newEntry.items = append(newEntry.items, &liveList.Items[i]) - } - e.liveLookupCache.Add(namespace, newEntry) - lruItemObj = newEntry } lruEntry := lruItemObj.(liveLookupEntry) for i := range lruEntry.items { diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access_test.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access_test.go index b784add8841..9bd035a8028 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access_test.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/resourcequota/resource_access_test.go @@ -17,7 +17,10 @@ limitations under the License. package resourcequota import ( + "fmt" "reflect" + "sync" + "sync/atomic" "testing" "time" @@ -27,6 +30,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" "k8s.io/utils/lru" ) @@ -121,5 +125,122 @@ func TestLRUCacheLookup(t *testing.T) { } }) } - +} + +// TestGetQuotas ensures we do not have multiple LIST calls to the apiserver +// in-flight at any one time. This is to ensure the issue described in #22422 do +// not happen again. +func TestGetQuotas(t *testing.T) { + var ( + testNamespace1 = "test-a" + testNamespace2 = "test-b" + listCallCountTestNamespace1 int64 + listCallCountTestNamespace2 int64 + ) + resourceQuota := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + + resourceQuotas := []*corev1.ResourceQuota{resourceQuota} + + kubeClient := &fake.Clientset{} + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + + accessor, _ := newQuotaAccessor() + accessor.client = kubeClient + accessor.lister = informerFactory.Core().V1().ResourceQuotas().Lister() + + kubeClient.AddReactor("list", "resourcequotas", func(action core.Action) (bool, runtime.Object, error) { + switch action.GetNamespace() { + case testNamespace1: + atomic.AddInt64(&listCallCountTestNamespace1, 1) + case testNamespace2: + atomic.AddInt64(&listCallCountTestNamespace2, 1) + default: + t.Error("unexpected namespace") + } + + resourceQuotaList := &corev1.ResourceQuotaList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", len(resourceQuotas)), + }, + } + for i, quota := range resourceQuotas { + quota.ResourceVersion = fmt.Sprintf("%d", i) + quota.Namespace = action.GetNamespace() + resourceQuotaList.Items = append(resourceQuotaList.Items, *quota) + } + // make the handler slow so concurrent calls exercise the singleflight + time.Sleep(time.Second) + return true, resourceQuotaList, nil + }) + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(2) + // simulating concurrent calls after a cache failure + go func() { + defer wg.Done() + quotas, err := accessor.GetQuotas(testNamespace1) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(quotas) != len(resourceQuotas) { + t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas)) + } + for _, q := range quotas { + if q.Namespace != testNamespace1 { + t.Errorf("Expected %s namespace, got %s", testNamespace1, q.Namespace) + } + } + }() + + // simulation of different namespaces is a call for a different group key, but not shared with the first namespace + go func() { + defer wg.Done() + quotas, err := accessor.GetQuotas(testNamespace2) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(quotas) != len(resourceQuotas) { + t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas)) + } + for _, q := range quotas { + if q.Namespace != testNamespace2 { + t.Errorf("Expected %s namespace, got %s", testNamespace2, q.Namespace) + } + } + }() + } + + // and here we wait for all the goroutines + wg.Wait() + // since all the calls with the same namespace will be held, they must + // be caught on the singleflight group. there are two different sets of + // namespace calls hence only 2. + if listCallCountTestNamespace1 != 1 { + t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace1) + } + if listCallCountTestNamespace2 != 1 { + t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2) + } + + // invalidate the cache + accessor.liveLookupCache.Remove(testNamespace1) + quotas, err := accessor.GetQuotas(testNamespace1) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(quotas) != len(resourceQuotas) { + t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas)) + } + + if listCallCountTestNamespace1 != 2 { + t.Errorf("Expected 2 resource quota call, got %d", listCallCountTestNamespace1) + } + if listCallCountTestNamespace2 != 1 { + t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2) + } }