mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Merge pull request #130775 from serathius/watchcache-consistent-read
Fix detecting consistent read when watchcache starts handling continue
This commit is contained in:
commit
54e7d2760d
@ -176,7 +176,8 @@ func (c *CacheDelegator) Get(ctx context.Context, key string, opts storage.GetOp
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
if shouldDelegateList(opts) {
|
shouldDelegate, consistentRead := shouldDelegateList(opts)
|
||||||
|
if shouldDelegate {
|
||||||
return c.storage.GetList(ctx, key, opts, listObj)
|
return c.storage.GetList(ctx, key, opts, listObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -198,8 +199,6 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
|||||||
return c.storage.GetList(ctx, key, opts, listObj)
|
return c.storage.GetList(ctx, key, opts, listObj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
|
||||||
consistentRead := opts.ResourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
|
|
||||||
if consistentRead {
|
if consistentRead {
|
||||||
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
|
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -243,31 +242,33 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
|
|||||||
// NOTICE: Keep in sync with shouldListFromStorage function in
|
// NOTICE: Keep in sync with shouldListFromStorage function in
|
||||||
//
|
//
|
||||||
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
|
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
|
||||||
func shouldDelegateList(opts storage.ListOptions) bool {
|
func shouldDelegateList(opts storage.ListOptions) (shouldDeletage, consistentRead bool) {
|
||||||
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
||||||
|
consistentRead = false
|
||||||
switch opts.ResourceVersionMatch {
|
switch opts.ResourceVersionMatch {
|
||||||
case metav1.ResourceVersionMatchExact:
|
case metav1.ResourceVersionMatchExact:
|
||||||
return true
|
return true, consistentRead
|
||||||
case metav1.ResourceVersionMatchNotOlderThan:
|
case metav1.ResourceVersionMatchNotOlderThan:
|
||||||
return false
|
return false, consistentRead
|
||||||
case "":
|
case "":
|
||||||
// Legacy exact match
|
// Legacy exact match
|
||||||
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
|
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
|
||||||
return true
|
return true, consistentRead
|
||||||
}
|
}
|
||||||
// Continue
|
// Continue
|
||||||
if len(opts.Predicate.Continue) > 0 {
|
if len(opts.Predicate.Continue) > 0 {
|
||||||
return true
|
return true, consistentRead
|
||||||
}
|
}
|
||||||
// Consistent Read
|
// Consistent Read
|
||||||
if opts.ResourceVersion == "" {
|
if opts.ResourceVersion == "" {
|
||||||
|
consistentRead = true
|
||||||
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
|
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
|
||||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||||
return !consistentListFromCacheEnabled || !requestWatchProgressSupported
|
return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead
|
||||||
}
|
}
|
||||||
return false
|
return false, consistentRead
|
||||||
default:
|
default:
|
||||||
return true
|
return true, consistentRead
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,7 +19,6 @@ package request
|
|||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
@ -86,8 +85,8 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
|
|||||||
return WorkEstimate{InitialSeats: e.config.MinimumSeats}
|
return WorkEstimate{InitialSeats: e.config.MinimumSeats}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
listFromStorage, _ := shouldListFromStorage(&listOptions)
|
||||||
isListFromCache := requestInfo.Verb == "watch" || !shouldListFromStorage(query, &listOptions)
|
isListFromCache := requestInfo.Verb == "watch" || !listFromStorage
|
||||||
|
|
||||||
numStored, err := e.countGetterFn(key(requestInfo))
|
numStored, err := e.countGetterFn(key(requestInfo))
|
||||||
switch {
|
switch {
|
||||||
@ -163,30 +162,32 @@ func key(requestInfo *apirequest.RequestInfo) string {
|
|||||||
// NOTICE: Keep in sync with shouldDelegateList function in
|
// NOTICE: Keep in sync with shouldDelegateList function in
|
||||||
//
|
//
|
||||||
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
|
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
|
||||||
func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
|
func shouldListFromStorage(opts *metav1.ListOptions) (shouldDeletage, consistentRead bool) {
|
||||||
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
||||||
|
consistentRead = false
|
||||||
switch opts.ResourceVersionMatch {
|
switch opts.ResourceVersionMatch {
|
||||||
case metav1.ResourceVersionMatchExact:
|
case metav1.ResourceVersionMatchExact:
|
||||||
return true
|
return true, consistentRead
|
||||||
case metav1.ResourceVersionMatchNotOlderThan:
|
case metav1.ResourceVersionMatchNotOlderThan:
|
||||||
return false
|
return false, consistentRead
|
||||||
case "":
|
case "":
|
||||||
// Legacy exact match
|
// Legacy exact match
|
||||||
if opts.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
|
if opts.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
|
||||||
return true
|
return true, consistentRead
|
||||||
}
|
}
|
||||||
// Continue
|
// Continue
|
||||||
if len(opts.Continue) > 0 {
|
if len(opts.Continue) > 0 {
|
||||||
return true
|
return true, consistentRead
|
||||||
}
|
}
|
||||||
// Consistent Read
|
// Consistent Read
|
||||||
if opts.ResourceVersion == "" {
|
if opts.ResourceVersion == "" {
|
||||||
|
consistentRead = true
|
||||||
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
|
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
|
||||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||||
return !consistentListFromCacheEnabled || !requestWatchProgressSupported
|
return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead
|
||||||
}
|
}
|
||||||
return false
|
return false, consistentRead
|
||||||
default:
|
default:
|
||||||
return true
|
return true, consistentRead
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user