mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 02:41:25 +00:00
Merge pull request #132337 from hakuna-matatah/automated-cherry-pick-of-#132244-upstream-release-1.33
Automated cherry pick of #132244: 1.33 regression - Consistent paginated lists serve from cache
This commit is contained in:
commit
ecc8d0ae6a
@ -662,6 +662,7 @@ func TestMatchExactResourceVersionFallback(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true)
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
expectStoreRequests := 0
|
expectStoreRequests := 0
|
||||||
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
@ -759,6 +760,125 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetListNonRecursiveCacheWithConsistentListFromCache(t *testing.T) {
|
||||||
|
// Set feature gates once at the beginning since we only care about ConsistentListFromCache=true and ListFromCacheSnapshot=false
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
|
||||||
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false)
|
||||||
|
forceRequestWatchProgressSupport(t)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
consistentListFromCache bool
|
||||||
|
expectGetListCallCount int
|
||||||
|
expectGetCurrentRV bool
|
||||||
|
injectRVError bool
|
||||||
|
expectedError error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "ConsistentListFromCache enabled - served from cache",
|
||||||
|
consistentListFromCache: true,
|
||||||
|
expectGetListCallCount: 1,
|
||||||
|
expectGetCurrentRV: true,
|
||||||
|
injectRVError: false,
|
||||||
|
expectedError: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
var getListCount, getCurrentRVCount int
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
|
||||||
|
backingStorage.getListFn = func(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
getListCount++
|
||||||
|
if tc.injectRVError {
|
||||||
|
return errDummy
|
||||||
|
}
|
||||||
|
podList := listObj.(*example.PodList)
|
||||||
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
backingStorage.getRVFn = func(ctx context.Context) (uint64, error) {
|
||||||
|
getCurrentRVCount++
|
||||||
|
rv := uint64(100)
|
||||||
|
err := error(nil)
|
||||||
|
if tc.injectRVError {
|
||||||
|
err = errDummy
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return rv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cacher, v, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
// Wait for cacher to be ready before injecting errors
|
||||||
|
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready: %v", err)
|
||||||
|
}
|
||||||
|
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||||
|
defer delegator.Stop()
|
||||||
|
|
||||||
|
// Setup test object
|
||||||
|
key := "pods/ns"
|
||||||
|
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}
|
||||||
|
if err := v.UpdateObject(input, 100); err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put object into the store
|
||||||
|
if err := cacher.watchCache.Add(input); err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pred := storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
Limit: 500,
|
||||||
|
}
|
||||||
|
result := &example.PodList{}
|
||||||
|
|
||||||
|
// Make the list call with empty RV - delegator will get current RV and use it
|
||||||
|
err = delegator.GetList(context.TODO(), key, storage.ListOptions{
|
||||||
|
ResourceVersion: "",
|
||||||
|
Predicate: pred,
|
||||||
|
Recursive: true,
|
||||||
|
}, result)
|
||||||
|
|
||||||
|
// Verify error matches expectation
|
||||||
|
if !errors.Is(err, tc.expectedError) {
|
||||||
|
t.Errorf("Expected error %v, got: %v", tc.expectedError, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the correct storage method was called
|
||||||
|
if getListCount != tc.expectGetListCallCount {
|
||||||
|
t.Errorf("Expected GetList to be called %d times, but it was called %d times", tc.expectGetListCallCount, getListCount)
|
||||||
|
}
|
||||||
|
if tc.expectGetCurrentRV && getCurrentRVCount == 0 {
|
||||||
|
t.Error("Expected GetCurrentResourceVersion to be called, but it wasn't")
|
||||||
|
}
|
||||||
|
if !tc.expectGetCurrentRV && getCurrentRVCount > 0 {
|
||||||
|
t.Errorf("Expected GetCurrentResourceVersion not to be called, but it was called %d times", getCurrentRVCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
// For successful cache reads, verify the resource version
|
||||||
|
if err == nil {
|
||||||
|
resultRV, err := cacher.versioner.ParseResourceVersion(result.ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to parse result resource version: %v", err)
|
||||||
|
}
|
||||||
|
expectedRV := uint64(100)
|
||||||
|
if resultRV != expectedRV {
|
||||||
|
t.Errorf("Expected RV %d but got %d", expectedRV, resultRV)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
func TestGetCacheBypass(t *testing.T) {
|
func TestGetCacheBypass(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
@ -206,6 +206,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
|||||||
return c.storage.GetList(ctx, key, opts, listObj)
|
return c.storage.GetList(ctx, key, opts, listObj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fallbackOpts := opts
|
||||||
if result.ConsistentRead {
|
if result.ConsistentRead {
|
||||||
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
|
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -213,20 +214,28 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
|||||||
}
|
}
|
||||||
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
|
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
|
||||||
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
|
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
|
||||||
|
// If continue is not set, we need to set the resource version match to ResourceVersionMatchNotOlderThan to serve latest from cache
|
||||||
|
if opts.Predicate.Continue == "" {
|
||||||
|
opts.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err = c.cacher.GetList(ctx, key, opts, listObj)
|
err = c.cacher.GetList(ctx, key, opts, listObj)
|
||||||
success := "true"
|
success := "true"
|
||||||
fallback := "false"
|
fallback := "false"
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.IsResourceExpired(err) {
|
// ResourceExpired error occurs when attempting to list from cache with a specific resourceVersion
|
||||||
return c.storage.GetList(ctx, key, opts, listObj)
|
// that is no longer available in the cache. With ListFromCacheSnapshot feature (1.34+), we can
|
||||||
|
// serve exact resourceVersion requests from cache if available, falling back to storage only when
|
||||||
|
// the requested version is expired.
|
||||||
|
if errors.IsResourceExpired(err) && utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
|
||||||
|
return c.storage.GetList(ctx, key, fallbackOpts, listObj)
|
||||||
}
|
}
|
||||||
if result.ConsistentRead {
|
if result.ConsistentRead {
|
||||||
|
// IsTooLargeResourceVersion occurs when the requested RV is higher than cache's current RV
|
||||||
|
// and cache hasn't caught up within the timeout period. Fall back to etcd.
|
||||||
if storage.IsTooLargeResourceVersion(err) {
|
if storage.IsTooLargeResourceVersion(err) {
|
||||||
fallback = "true"
|
fallback = "true"
|
||||||
// Reset resourceVersion during fallback from consistent read.
|
err = c.storage.GetList(ctx, key, fallbackOpts, listObj)
|
||||||
opts.ResourceVersion = ""
|
|
||||||
err = c.storage.GetList(ctx, key, opts, listObj)
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
success = "false"
|
success = "false"
|
||||||
|
Loading…
Reference in New Issue
Block a user