Merge pull request #124163 from flavianmissi/resource-quota-single-flight

resourcequota: use singleflight.Group to reduce apiserver load
This commit is contained in:
Kubernetes Prow Robot 2024-04-18 03:23:53 -07:00 committed by GitHub
commit 7b33887879
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 138 additions and 13 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"time"
"golang.org/x/sync/singleflight"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -51,6 +52,7 @@ type quotaAccessor struct {
// This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
// We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache
group singleflight.Group
liveTTL time.Duration
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
@ -114,21 +116,23 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, err
if len(items) == 0 {
lruItemObj, ok := e.liveLookupCache.Get(namespace)
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
// TODO: If there are multiple operations at the same time and cache has just expired,
// this may cause multiple List operations being issued at the same time.
// If there is already in-flight List() for a given namespace, we should wait until
// it is finished and cache is updated instead of doing the same, also to avoid
// throttling - see #22422 for details.
liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{})
// use singleflight.Group to avoid flooding the apiserver with repeated
// requests. See #22422 for details.
lruItemObj, err, _ = e.group.Do(namespace, func() (interface{}, error) {
liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
for i := range liveList.Items {
newEntry.items = append(newEntry.items, &liveList.Items[i])
}
e.liveLookupCache.Add(namespace, newEntry)
return newEntry, nil
})
if err != nil {
return nil, err
}
newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
for i := range liveList.Items {
newEntry.items = append(newEntry.items, &liveList.Items[i])
}
e.liveLookupCache.Add(namespace, newEntry)
lruItemObj = newEntry
}
lruEntry := lruItemObj.(liveLookupEntry)
for i := range lruEntry.items {

View File

@ -17,7 +17,10 @@ limitations under the License.
package resourcequota
import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
@ -27,6 +30,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/utils/lru"
)
@ -121,5 +125,122 @@ func TestLRUCacheLookup(t *testing.T) {
}
})
}
}
// TestGetQuotas ensures we do not have multiple LIST calls to the apiserver
// in-flight at any one time. This is to ensure the issue described in #22422 do
// not happen again.
func TestGetQuotas(t *testing.T) {
var (
testNamespace1 = "test-a"
testNamespace2 = "test-b"
listCallCountTestNamespace1 int64
listCallCountTestNamespace2 int64
)
resourceQuota := &corev1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
}
resourceQuotas := []*corev1.ResourceQuota{resourceQuota}
kubeClient := &fake.Clientset{}
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
accessor, _ := newQuotaAccessor()
accessor.client = kubeClient
accessor.lister = informerFactory.Core().V1().ResourceQuotas().Lister()
kubeClient.AddReactor("list", "resourcequotas", func(action core.Action) (bool, runtime.Object, error) {
switch action.GetNamespace() {
case testNamespace1:
atomic.AddInt64(&listCallCountTestNamespace1, 1)
case testNamespace2:
atomic.AddInt64(&listCallCountTestNamespace2, 1)
default:
t.Error("unexpected namespace")
}
resourceQuotaList := &corev1.ResourceQuotaList{
ListMeta: metav1.ListMeta{
ResourceVersion: fmt.Sprintf("%d", len(resourceQuotas)),
},
}
for i, quota := range resourceQuotas {
quota.ResourceVersion = fmt.Sprintf("%d", i)
quota.Namespace = action.GetNamespace()
resourceQuotaList.Items = append(resourceQuotaList.Items, *quota)
}
// make the handler slow so concurrent calls exercise the singleflight
time.Sleep(time.Second)
return true, resourceQuotaList, nil
})
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(2)
// simulating concurrent calls after a cache failure
go func() {
defer wg.Done()
quotas, err := accessor.GetQuotas(testNamespace1)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(quotas) != len(resourceQuotas) {
t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
}
for _, q := range quotas {
if q.Namespace != testNamespace1 {
t.Errorf("Expected %s namespace, got %s", testNamespace1, q.Namespace)
}
}
}()
// simulation of different namespaces is a call for a different group key, but not shared with the first namespace
go func() {
defer wg.Done()
quotas, err := accessor.GetQuotas(testNamespace2)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(quotas) != len(resourceQuotas) {
t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
}
for _, q := range quotas {
if q.Namespace != testNamespace2 {
t.Errorf("Expected %s namespace, got %s", testNamespace2, q.Namespace)
}
}
}()
}
// and here we wait for all the goroutines
wg.Wait()
// since all the calls with the same namespace will be held, they must
// be caught on the singleflight group. there are two different sets of
// namespace calls hence only 2.
if listCallCountTestNamespace1 != 1 {
t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace1)
}
if listCallCountTestNamespace2 != 1 {
t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2)
}
// invalidate the cache
accessor.liveLookupCache.Remove(testNamespace1)
quotas, err := accessor.GetQuotas(testNamespace1)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(quotas) != len(resourceQuotas) {
t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
}
if listCallCountTestNamespace1 != 2 {
t.Errorf("Expected 2 resource quota call, got %d", listCallCountTestNamespace1)
}
if listCallCountTestNamespace2 != 1 {
t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2)
}
}