Fixed: 22422 use singleflight to alleviate simultaneous calls to

Signed-off-by: aimuz <mr.imuz@gmail.com>
This commit is contained in:
aimuz 2022-09-23 15:32:14 +08:00
parent 0a689af469
commit bd441d0a58
No known key found for this signature in database
GPG Key ID: 63C3DC9FBA22D9D7
2 changed files with 118 additions and 13 deletions

View File

@ -24,6 +24,7 @@ import (
"strings" "strings"
"time" "time"
"golang.org/x/sync/singleflight"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
@ -64,6 +65,7 @@ type LimitRanger struct {
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. // This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
// We track the lookup result here so that for repeated requests, we don't look it up very often. // We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache liveLookupCache *lru.Cache
group singleflight.Group
liveTTL time.Duration liveTTL time.Duration
} }
@ -161,21 +163,23 @@ func (l *LimitRanger) GetLimitRanges(a admission.Attributes) ([]*corev1.LimitRan
if len(items) == 0 { if len(items) == 0 {
lruItemObj, ok := l.liveLookupCache.Get(a.GetNamespace()) lruItemObj, ok := l.liveLookupCache.Get(a.GetNamespace())
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
// TODO: If there are multiple operations at the same time and cache has just expired, // Fixed: #22422
// this may cause multiple List operations being issued at the same time. // use singleflight to alleviate simultaneous calls to
// If there is already in-flight List() for a given namespace, we should wait until lruItemObj, err, _ = l.group.Do(a.GetNamespace(), func() (interface{}, error) {
// it is finished and cache is updated instead of doing the same, also to avoid liveList, err := l.client.CoreV1().LimitRanges(a.GetNamespace()).List(context.TODO(), metav1.ListOptions{})
// throttling - see #22422 for details. if err != nil {
liveList, err := l.client.CoreV1().LimitRanges(a.GetNamespace()).List(context.TODO(), metav1.ListOptions{}) return nil, admission.NewForbidden(a, err)
}
newEntry := liveLookupEntry{expiry: time.Now().Add(l.liveTTL)}
for i := range liveList.Items {
newEntry.items = append(newEntry.items, &liveList.Items[i])
}
l.liveLookupCache.Add(a.GetNamespace(), newEntry)
return newEntry, nil
})
if err != nil { if err != nil {
return nil, admission.NewForbidden(a, err) return nil, err
} }
newEntry := liveLookupEntry{expiry: time.Now().Add(l.liveTTL)}
for i := range liveList.Items {
newEntry.items = append(newEntry.items, &liveList.Items[i])
}
l.liveLookupCache.Add(a.GetNamespace(), newEntry)
lruItemObj = newEntry
} }
lruEntry := lruItemObj.(liveLookupEntry) lruEntry := lruItemObj.(liveLookupEntry)

View File

@ -20,6 +20,8 @@ import (
"context" "context"
"fmt" "fmt"
"strconv" "strconv"
"sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -856,3 +858,102 @@ func TestPersistentVolumeClaimLimitFunc(t *testing.T) {
} }
} }
} }
// TestLimitRanger_GetLimitRangesFixed22422 Fixed Admission controllers can cause unnecessary significant load on apiserver #22422
func TestLimitRanger_GetLimitRangesFixed22422(t *testing.T) {
limitRange := validLimitRangeNoDefaults()
limitRanges := []corev1.LimitRange{limitRange}
var count int64
mockClient := &fake.Clientset{}
unhold := make(chan struct{})
mockClient.AddReactor("list", "limitranges", func(action core.Action) (bool, runtime.Object, error) {
atomic.AddInt64(&count, 1)
limitRangeList := &corev1.LimitRangeList{
ListMeta: metav1.ListMeta{
ResourceVersion: fmt.Sprintf("%d", len(limitRanges)),
},
}
for index, value := range limitRanges {
value.ResourceVersion = fmt.Sprintf("%d", index)
value.Namespace = action.GetNamespace()
limitRangeList.Items = append(limitRangeList.Items, value)
}
// he always blocking before sending the signal
<-unhold
return true, limitRangeList, nil
})
handler, _, err := newHandlerForTest(mockClient)
if err != nil {
t.Errorf("unexpected error initializing handler: %v", err)
}
attributes := admission.NewAttributesRecord(nil, nil, api.Kind("kind").WithVersion("version"), "test", "name", api.Resource("resource").WithVersion("version"), "subresource", admission.Create, &metav1.CreateOptions{}, false, nil)
attributesTest1 := admission.NewAttributesRecord(nil, nil, api.Kind("kind").WithVersion("version"), "test1", "name", api.Resource("resource").WithVersion("version"), "subresource", admission.Create, &metav1.CreateOptions{}, false, nil)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(2)
// simulating concurrent calls after a cache failure
go func() {
defer wg.Done()
ret, err := handler.GetLimitRanges(attributes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
for _, c := range ret {
if c.Namespace != attributes.GetNamespace() {
t.Errorf("Expected %s namespace, got %s", attributes.GetNamespace(), c.Namespace)
}
}
}()
// simulation of different namespaces is not a call
go func() {
defer wg.Done()
ret, err := handler.GetLimitRanges(attributesTest1)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
for _, c := range ret {
if c.Namespace != attributesTest1.GetNamespace() {
t.Errorf("Expected %s namespace, got %s", attributesTest1.GetNamespace(), c.Namespace)
}
}
}()
}
// unhold all the calls with the same namespace handler.GetLimitRanges(attributes) calls, that have to be aggregated
unhold <- struct{}{}
go func() {
unhold <- struct{}{}
}()
// and here we wait for all the goroutines
wg.Wait()
// since all the calls with the same namespace will be holded, they must be catched on the singleflight group,
// There are two different sets of namespace calls
// hence only 2
if count != 2 {
t.Errorf("Expected 1 limit range, got %d", count)
}
// invalidate the cache
handler.liveLookupCache.Remove(attributes.GetNamespace())
go func() {
// unhold it is blocking until GetLimitRanges is executed
unhold <- struct{}{}
}()
_, err = handler.GetLimitRanges(attributes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
close(unhold)
if count != 3 {
t.Errorf("Expected 2 limit range, got %d", count)
}
}