diff --git a/plugin/pkg/admission/limitranger/admission.go b/plugin/pkg/admission/limitranger/admission.go index 122ec149c18..4525463d2e6 100644 --- a/plugin/pkg/admission/limitranger/admission.go +++ b/plugin/pkg/admission/limitranger/admission.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "golang.org/x/sync/singleflight" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" @@ -64,6 +65,7 @@ type LimitRanger struct { // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. // We track the lookup result here so that for repeated requests, we don't look it up very often. liveLookupCache *lru.Cache + group singleflight.Group liveTTL time.Duration } @@ -161,21 +163,23 @@ func (l *LimitRanger) GetLimitRanges(a admission.Attributes) ([]*corev1.LimitRan if len(items) == 0 { lruItemObj, ok := l.liveLookupCache.Get(a.GetNamespace()) if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { - // TODO: If there are multiple operations at the same time and cache has just expired, - // this may cause multiple List operations being issued at the same time. - // If there is already in-flight List() for a given namespace, we should wait until - // it is finished and cache is updated instead of doing the same, also to avoid - // throttling - see #22422 for details. - liveList, err := l.client.CoreV1().LimitRanges(a.GetNamespace()).List(context.TODO(), metav1.ListOptions{}) + // Fixed: #22422 + // use singleflight to alleviate simultaneous calls to + lruItemObj, err, _ = l.group.Do(a.GetNamespace(), func() (interface{}, error) { + liveList, err := l.client.CoreV1().LimitRanges(a.GetNamespace()).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, admission.NewForbidden(a, err) + } + newEntry := liveLookupEntry{expiry: time.Now().Add(l.liveTTL)} + for i := range liveList.Items { + newEntry.items = append(newEntry.items, &liveList.Items[i]) + } + l.liveLookupCache.Add(a.GetNamespace(), newEntry) + return newEntry, nil + }) if err != nil { - return nil, admission.NewForbidden(a, err) + return nil, err } - newEntry := liveLookupEntry{expiry: time.Now().Add(l.liveTTL)} - for i := range liveList.Items { - newEntry.items = append(newEntry.items, &liveList.Items[i]) - } - l.liveLookupCache.Add(a.GetNamespace(), newEntry) - lruItemObj = newEntry } lruEntry := lruItemObj.(liveLookupEntry) diff --git a/plugin/pkg/admission/limitranger/admission_test.go b/plugin/pkg/admission/limitranger/admission_test.go index 4e5879d713e..01a83816037 100644 --- a/plugin/pkg/admission/limitranger/admission_test.go +++ b/plugin/pkg/admission/limitranger/admission_test.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -856,3 +858,102 @@ func TestPersistentVolumeClaimLimitFunc(t *testing.T) { } } } + +// TestLimitRanger_GetLimitRangesFixed22422 Fixed Admission controllers can cause unnecessary significant load on apiserver #22422 +func TestLimitRanger_GetLimitRangesFixed22422(t *testing.T) { + limitRange := validLimitRangeNoDefaults() + limitRanges := []corev1.LimitRange{limitRange} + + var count int64 + mockClient := &fake.Clientset{} + + unhold := make(chan struct{}) + mockClient.AddReactor("list", "limitranges", func(action core.Action) (bool, runtime.Object, error) { + atomic.AddInt64(&count, 1) + + limitRangeList := &corev1.LimitRangeList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", len(limitRanges)), + }, + } + for index, value := range limitRanges { + value.ResourceVersion = fmt.Sprintf("%d", index) + value.Namespace = action.GetNamespace() + limitRangeList.Items = append(limitRangeList.Items, value) + } + // he always blocking before sending the signal + <-unhold + return true, limitRangeList, nil + }) + + handler, _, err := newHandlerForTest(mockClient) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + + attributes := admission.NewAttributesRecord(nil, nil, api.Kind("kind").WithVersion("version"), "test", "name", api.Resource("resource").WithVersion("version"), "subresource", admission.Create, &metav1.CreateOptions{}, false, nil) + + attributesTest1 := admission.NewAttributesRecord(nil, nil, api.Kind("kind").WithVersion("version"), "test1", "name", api.Resource("resource").WithVersion("version"), "subresource", admission.Create, &metav1.CreateOptions{}, false, nil) + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(2) + // simulating concurrent calls after a cache failure + go func() { + defer wg.Done() + ret, err := handler.GetLimitRanges(attributes) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + for _, c := range ret { + if c.Namespace != attributes.GetNamespace() { + t.Errorf("Expected %s namespace, got %s", attributes.GetNamespace(), c.Namespace) + } + } + }() + + // simulation of different namespaces is not a call + go func() { + defer wg.Done() + ret, err := handler.GetLimitRanges(attributesTest1) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + for _, c := range ret { + if c.Namespace != attributesTest1.GetNamespace() { + t.Errorf("Expected %s namespace, got %s", attributesTest1.GetNamespace(), c.Namespace) + } + } + }() + } + // unhold all the calls with the same namespace handler.GetLimitRanges(attributes) calls, that have to be aggregated + unhold <- struct{}{} + go func() { + unhold <- struct{}{} + }() + + // and here we wait for all the goroutines + wg.Wait() + // since all the calls with the same namespace will be holded, they must be catched on the singleflight group, + // There are two different sets of namespace calls + // hence only 2 + if count != 2 { + t.Errorf("Expected 1 limit range, got %d", count) + } + + // invalidate the cache + handler.liveLookupCache.Remove(attributes.GetNamespace()) + go func() { + // unhold it is blocking until GetLimitRanges is executed + unhold <- struct{}{} + }() + _, err = handler.GetLimitRanges(attributes) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + close(unhold) + + if count != 3 { + t.Errorf("Expected 2 limit range, got %d", count) + } +}