mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #126467 from serathius/fallback
Implement fallback for consistent reads from cache
This commit is contained in:
commit
974f3d3d8f
@ -848,7 +848,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
||||
preparedKey += "/"
|
||||
}
|
||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
|
||||
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
|
||||
if consistentRead {
|
||||
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
|
||||
if err != nil {
|
||||
return err
|
||||
@ -887,9 +888,24 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
||||
}
|
||||
|
||||
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
|
||||
success := "true"
|
||||
fallback := "false"
|
||||
if err != nil {
|
||||
if consistentRead {
|
||||
if storage.IsTooLargeResourceVersion(err) {
|
||||
fallback = "true"
|
||||
err = c.storage.GetList(ctx, key, opts, listObj)
|
||||
}
|
||||
if err != nil {
|
||||
success = "false"
|
||||
}
|
||||
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
|
||||
}
|
||||
return err
|
||||
}
|
||||
if consistentRead {
|
||||
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
|
||||
}
|
||||
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
|
||||
// store pointer of eligible objects,
|
||||
// Why not directly put object in the items of listObj?
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -45,10 +46,13 @@ import (
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
k8smetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/utils/clock"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
"k8s.io/utils/pointer"
|
||||
@ -288,6 +292,138 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
|
||||
}
|
||||
}
|
||||
|
||||
func TestConsistentReadFallback(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
consistentReadsEnabled bool
|
||||
watchCacheRV string
|
||||
storageRV string
|
||||
fallbackError bool
|
||||
|
||||
expectError bool
|
||||
expectRV string
|
||||
expectBlock bool
|
||||
expectRequestsToStorage int
|
||||
expectMetric string
|
||||
}{
|
||||
{
|
||||
name: "Success",
|
||||
consistentReadsEnabled: true,
|
||||
watchCacheRV: "42",
|
||||
storageRV: "42",
|
||||
expectRV: "42",
|
||||
expectRequestsToStorage: 1,
|
||||
expectMetric: `
|
||||
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
|
||||
# TYPE apiserver_watch_cache_consistent_read_total counter
|
||||
apiserver_watch_cache_consistent_read_total{fallback="false", resource="pods", success="true"} 1
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "Fallback",
|
||||
consistentReadsEnabled: true,
|
||||
watchCacheRV: "2",
|
||||
storageRV: "42",
|
||||
expectRV: "42",
|
||||
expectBlock: true,
|
||||
expectRequestsToStorage: 2,
|
||||
expectMetric: `
|
||||
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
|
||||
# TYPE apiserver_watch_cache_consistent_read_total counter
|
||||
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="true"} 1
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "Fallback Failure",
|
||||
consistentReadsEnabled: true,
|
||||
watchCacheRV: "2",
|
||||
storageRV: "42",
|
||||
fallbackError: true,
|
||||
expectError: true,
|
||||
expectBlock: true,
|
||||
expectRequestsToStorage: 2,
|
||||
expectMetric: `
|
||||
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
|
||||
# TYPE apiserver_watch_cache_consistent_read_total counter
|
||||
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="false"} 1
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "Disabled",
|
||||
watchCacheRV: "2",
|
||||
storageRV: "42",
|
||||
expectRV: "42",
|
||||
expectRequestsToStorage: 1,
|
||||
expectMetric: ``,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.consistentReadsEnabled)
|
||||
if tc.consistentReadsEnabled {
|
||||
forceRequestWatchProgressSupport(t)
|
||||
}
|
||||
|
||||
registry := k8smetrics.NewKubeRegistry()
|
||||
metrics.ConsistentReadTotal.Reset()
|
||||
if err := registry.Register(metrics.ConsistentReadTotal); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
backingStorage := &dummyStorage{}
|
||||
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
podList := listObj.(*example.PodList)
|
||||
podList.ResourceVersion = tc.watchCacheRV
|
||||
return nil
|
||||
}
|
||||
// TODO: Use fake clock for this test to reduce execution time.
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
if fmt.Sprintf("%d", cacher.watchCache.resourceVersion) != tc.watchCacheRV {
|
||||
t.Fatalf("Expected watch cache RV to equal watchCacheRV, got: %d, want: %s", cacher.watchCache.resourceVersion, tc.watchCacheRV)
|
||||
}
|
||||
requestToStorageCount := 0
|
||||
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
requestToStorageCount += 1
|
||||
podList := listObj.(*example.PodList)
|
||||
if key == cacher.resourcePrefix {
|
||||
podList.ResourceVersion = tc.storageRV
|
||||
return nil
|
||||
}
|
||||
if tc.fallbackError {
|
||||
return errDummy
|
||||
}
|
||||
podList.ResourceVersion = tc.storageRV
|
||||
return nil
|
||||
}
|
||||
result := &example.PodList{}
|
||||
start := cacher.clock.Now()
|
||||
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
|
||||
duration := cacher.clock.Since(start)
|
||||
if (err != nil) != tc.expectError {
|
||||
t.Fatalf("Unexpected error err: %v", err)
|
||||
}
|
||||
if result.ResourceVersion != tc.expectRV {
|
||||
t.Fatalf("Unexpected List response RV, got: %q, want: %q", result.ResourceVersion, tc.expectRV)
|
||||
}
|
||||
if requestToStorageCount != tc.expectRequestsToStorage {
|
||||
t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", requestToStorageCount, tc.expectRequestsToStorage)
|
||||
}
|
||||
blocked := duration >= blockTimeout
|
||||
if blocked != tc.expectBlock {
|
||||
t.Fatalf("Unexpected block, got: %v, want: %v", blocked, tc.expectBlock)
|
||||
}
|
||||
|
||||
if err := testutil.GatherAndCompare(registry, strings.NewReader(tc.expectMetric), "apiserver_watch_cache_consistent_read_total"); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
|
||||
backingStorage := &dummyStorage{}
|
||||
|
@ -167,6 +167,15 @@ var (
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3},
|
||||
}, []string{"resource"})
|
||||
|
||||
ConsistentReadTotal = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "consistent_read_total",
|
||||
Help: "Counter for consistent reads from cache.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
}, []string{"resource", "success", "fallback"})
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
@ -188,6 +197,7 @@ func Register() {
|
||||
legacyregistry.MustRegister(WatchCacheCapacity)
|
||||
legacyregistry.MustRegister(WatchCacheInitializations)
|
||||
legacyregistry.MustRegister(WatchCacheReadWait)
|
||||
legacyregistry.MustRegister(ConsistentReadTotal)
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user