mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
Fix -Consistent paginated lists serve from cache
This commit is contained in:
parent
83613fcfd5
commit
d4a4a1d881
@ -662,6 +662,7 @@ func TestMatchExactResourceVersionFallback(t *testing.T) {
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, true)
|
||||
backingStorage := &dummyStorage{}
|
||||
expectStoreRequests := 0
|
||||
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
@ -759,6 +760,125 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetListNonRecursiveCacheWithConsistentListFromCache(t *testing.T) {
|
||||
// Set feature gates once at the beginning since we only care about ConsistentListFromCache=true and ListFromCacheSnapshot=false
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ListFromCacheSnapshot, false)
|
||||
forceRequestWatchProgressSupport(t)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
consistentListFromCache bool
|
||||
expectGetListCallCount int
|
||||
expectGetCurrentRV bool
|
||||
injectRVError bool
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "ConsistentListFromCache enabled - served from cache",
|
||||
consistentListFromCache: true,
|
||||
expectGetListCallCount: 1,
|
||||
expectGetCurrentRV: true,
|
||||
injectRVError: false,
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var getListCount, getCurrentRVCount int
|
||||
backingStorage := &dummyStorage{}
|
||||
|
||||
backingStorage.getListFn = func(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
getListCount++
|
||||
if tc.injectRVError {
|
||||
return errDummy
|
||||
}
|
||||
podList := listObj.(*example.PodList)
|
||||
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
|
||||
return nil
|
||||
}
|
||||
|
||||
backingStorage.getRVFn = func(ctx context.Context) (uint64, error) {
|
||||
getCurrentRVCount++
|
||||
rv := uint64(100)
|
||||
err := error(nil)
|
||||
if tc.injectRVError {
|
||||
err = errDummy
|
||||
return 0, err
|
||||
}
|
||||
return rv, nil
|
||||
}
|
||||
|
||||
cacher, v, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
// Wait for cacher to be ready before injecting errors
|
||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||
t.Fatalf("unexpected error waiting for the cache to be ready: %v", err)
|
||||
}
|
||||
delegator := NewCacheDelegator(cacher, backingStorage)
|
||||
defer delegator.Stop()
|
||||
|
||||
// Setup test object
|
||||
key := "pods/ns"
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}
|
||||
if err := v.UpdateObject(input, 100); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Put object into the store
|
||||
if err := cacher.watchCache.Add(input); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
pred := storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
Limit: 500,
|
||||
}
|
||||
result := &example.PodList{}
|
||||
|
||||
// Make the list call with empty RV - delegator will get current RV and use it
|
||||
err = delegator.GetList(context.TODO(), key, storage.ListOptions{
|
||||
ResourceVersion: "",
|
||||
Predicate: pred,
|
||||
Recursive: true,
|
||||
}, result)
|
||||
|
||||
// Verify error matches expectation
|
||||
if !errors.Is(err, tc.expectedError) {
|
||||
t.Errorf("Expected error %v, got: %v", tc.expectedError, err)
|
||||
}
|
||||
|
||||
// Verify the correct storage method was called
|
||||
if getListCount != tc.expectGetListCallCount {
|
||||
t.Errorf("Expected GetList to be called %d times, but it was called %d times", tc.expectGetListCallCount, getListCount)
|
||||
}
|
||||
if tc.expectGetCurrentRV && getCurrentRVCount == 0 {
|
||||
t.Error("Expected GetCurrentResourceVersion to be called, but it wasn't")
|
||||
}
|
||||
if !tc.expectGetCurrentRV && getCurrentRVCount > 0 {
|
||||
t.Errorf("Expected GetCurrentResourceVersion not to be called, but it was called %d times", getCurrentRVCount)
|
||||
}
|
||||
|
||||
// For successful cache reads, verify the resource version
|
||||
if err == nil {
|
||||
resultRV, err := cacher.versioner.ParseResourceVersion(result.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse result resource version: %v", err)
|
||||
}
|
||||
expectedRV := uint64(100)
|
||||
if resultRV != expectedRV {
|
||||
t.Errorf("Expected RV %d but got %d", expectedRV, resultRV)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
func TestGetCacheBypass(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
|
@ -206,6 +206,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
||||
return c.storage.GetList(ctx, key, opts, listObj)
|
||||
}
|
||||
}
|
||||
fallbackOpts := opts
|
||||
if result.ConsistentRead {
|
||||
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
|
||||
if err != nil {
|
||||
@ -213,20 +214,28 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
||||
}
|
||||
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
|
||||
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
|
||||
// If continue is not set, we need to set the resource version match to ResourceVersionMatchNotOlderThan to serve latest from cache
|
||||
if opts.Predicate.Continue == "" {
|
||||
opts.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
||||
}
|
||||
}
|
||||
err = c.cacher.GetList(ctx, key, opts, listObj)
|
||||
success := "true"
|
||||
fallback := "false"
|
||||
if err != nil {
|
||||
if errors.IsResourceExpired(err) {
|
||||
return c.storage.GetList(ctx, key, opts, listObj)
|
||||
// ResourceExpired error occurs when attempting to list from cache with a specific resourceVersion
|
||||
// that is no longer available in the cache. With ListFromCacheSnapshot feature (1.34+), we can
|
||||
// serve exact resourceVersion requests from cache if available, falling back to storage only when
|
||||
// the requested version is expired.
|
||||
if errors.IsResourceExpired(err) && utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
|
||||
return c.storage.GetList(ctx, key, fallbackOpts, listObj)
|
||||
}
|
||||
if result.ConsistentRead {
|
||||
// IsTooLargeResourceVersion occurs when the requested RV is higher than cache's current RV
|
||||
// and cache hasn't caught up within the timeout period. Fall back to etcd.
|
||||
if storage.IsTooLargeResourceVersion(err) {
|
||||
fallback = "true"
|
||||
// Reset resourceVersion during fallback from consistent read.
|
||||
opts.ResourceVersion = ""
|
||||
err = c.storage.GetList(ctx, key, opts, listObj)
|
||||
err = c.storage.GetList(ctx, key, fallbackOpts, listObj)
|
||||
}
|
||||
if err != nil {
|
||||
success = "false"
|
||||
|
Loading…
Reference in New Issue
Block a user