mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 01:06:27 +00:00
Enable progress notify events in watchcache
This commit is contained in:
parent
4af1328bb8
commit
a94fb5369d
@ -138,6 +138,12 @@ const (
|
|||||||
//
|
//
|
||||||
// Allows sending warning headers in API responses.
|
// Allows sending warning headers in API responses.
|
||||||
WarningHeaders featuregate.Feature = "WarningHeaders"
|
WarningHeaders featuregate.Feature = "WarningHeaders"
|
||||||
|
|
||||||
|
// owner: @wojtek-t
|
||||||
|
// alpha: v1.20
|
||||||
|
//
|
||||||
|
// Allows for updating watchcache resource version with progress notify events.
|
||||||
|
EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -148,18 +154,19 @@ func init() {
|
|||||||
// To add a new feature, define a key for it above and add it here. The features will be
|
// To add a new feature, define a key for it above and add it here. The features will be
|
||||||
// available throughout Kubernetes binaries.
|
// available throughout Kubernetes binaries.
|
||||||
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
|
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
|
||||||
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
|
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
|
||||||
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
|
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
|
||||||
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
|
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
|
||||||
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
|
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
|
||||||
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
|
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
|
||||||
DryRun: {Default: true, PreRelease: featuregate.GA},
|
DryRun: {Default: true, PreRelease: featuregate.GA},
|
||||||
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
|
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
|
||||||
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
|
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
|
||||||
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
|
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
|
||||||
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
|
||||||
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
|
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
|
||||||
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
|
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
|
||||||
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
|
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
|
||||||
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
|
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
|
||||||
|
EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha},
|
||||||
}
|
}
|
||||||
|
@ -1098,7 +1098,14 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
|
|||||||
|
|
||||||
// Implements cache.ListerWatcher interface.
|
// Implements cache.ListerWatcher interface.
|
||||||
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
|
opts := storage.ListOptions{
|
||||||
|
ResourceVersion: options.ResourceVersion,
|
||||||
|
Predicate: storage.Everything,
|
||||||
|
}
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) {
|
||||||
|
opts.ProgressNotify = true
|
||||||
|
}
|
||||||
|
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// errWatcher implements watch.Interface to return a single error
|
// errWatcher implements watch.Interface to return a single error
|
||||||
|
@ -381,6 +381,29 @@ func (w *watchCache) doCacheResizeLocked(capacity int) {
|
|||||||
w.capacity = capacity
|
w.capacity = capacity
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
|
||||||
|
rv, err := w.versioner.ParseResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Couldn't parse resourceVersion: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Lock()
|
||||||
|
defer w.Unlock()
|
||||||
|
w.resourceVersion = rv
|
||||||
|
|
||||||
|
// Don't dispatch bookmarks coming from the storage layer.
|
||||||
|
// They can be very frequent (even to the level of subseconds)
|
||||||
|
// to allow efficient watch resumption on kube-apiserver restarts,
|
||||||
|
// and propagating them down may overload the whole system.
|
||||||
|
//
|
||||||
|
// TODO: If at some point we decide the performance and scalability
|
||||||
|
// footprint is acceptable, this is the place to hook them in.
|
||||||
|
// However, we then need to check if this was called as a result
|
||||||
|
// of a bookmark event or regular Add/Update/Delete operation by
|
||||||
|
// checking if resourceVersion here has changed.
|
||||||
|
}
|
||||||
|
|
||||||
// List returns list of pointers to <storeElement> objects.
|
// List returns list of pointers to <storeElement> objects.
|
||||||
func (w *watchCache) List() []interface{} {
|
func (w *watchCache) List() []interface{} {
|
||||||
return w.store.List()
|
return w.store.List()
|
||||||
|
Loading…
Reference in New Issue
Block a user