From 53653e9b0a7145c3b6e133c5639194ff79b3719d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 18 Nov 2021 12:40:26 +0100 Subject: [PATCH] Simplify storage.Interface by merging Watch and WatchList functions. --- .../pkg/registry/generic/registry/dryrun.go | 4 ---- .../pkg/registry/generic/registry/store.go | 18 +++++++----------- .../apiserver/pkg/storage/cacher/cacher.go | 8 ++------ .../pkg/storage/cacher/cacher_whitebox_test.go | 3 --- .../apiserver/pkg/storage/etcd3/store.go | 11 +---------- .../pkg/storage/etcd3/watcher_test.go | 5 +---- .../k8s.io/apiserver/pkg/storage/interfaces.go | 12 +++--------- .../apiserver/pkg/storage/tests/cacher_test.go | 8 ++++---- 8 files changed, 18 insertions(+), 51 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go index e25684a8a53..f746d097889 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go @@ -60,10 +60,6 @@ func (s *DryRunnableStorage) Watch(ctx context.Context, key string, opts storage return s.Storage.Watch(ctx, key, opts) } -func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - return s.Storage.WatchList(ctx, key, opts) -} - func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { return s.Storage.Get(ctx, key, opts, objPtr) } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 9897a295240..81e6d919e21 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1239,23 +1239,19 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti // WatchPredicate starts a watch for the items that matches. func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) { - storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p} + storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true} + + key := e.KeyRootFunc(ctx) if name, ok := p.MatchesSingle(); ok { - if key, err := e.KeyFunc(ctx, name); err == nil { - w, err := e.Storage.Watch(ctx, key, storageOpts) - if err != nil { - return nil, err - } - if e.Decorator != nil { - return newDecoratedWatcher(ctx, w, e.Decorator), nil - } - return w, nil + if k, err := e.KeyFunc(ctx, name); err == nil { + key = k + storageOpts.Recursive = false } // if we cannot extract a key based on the current context, the // optimization is skipped } - w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), storageOpts) + w, err := e.Storage.Watch(ctx, key, storageOpts) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 217fa72588c..6a71b757954 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -534,11 +534,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return watcher, nil } -// WatchList implements storage.Interface. -func (c *Cacher) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - return c.Watch(ctx, key, opts) -} - // Get implements storage.Interface. func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { if opts.ResourceVersion == "" { @@ -1136,11 +1131,12 @@ func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interfac opts := storage.ListOptions{ ResourceVersion: options.ResourceVersion, Predicate: storage.Everything, + Recursive: true, } if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) { opts.ProgressNotify = true } - return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts) + return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts) } // errWatcher implements watch.Interface to return a single error diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 88819285f5a..bae4b4ca1fd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -308,9 +308,6 @@ func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ * func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { return newDummyWatch(), nil } -func (d *dummyStorage) WatchList(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { - return newDummyWatch(), nil -} func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ runtime.Object) error { return d.err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 37d411c774f..59305cff505 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -862,21 +862,12 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) { // Watch implements storage.Interface.Watch. func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - return s.watch(ctx, key, opts, false) -} - -// WatchList implements storage.Interface.WatchList. -func (s *store) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - return s.watch(ctx, key, opts, true) -} - -func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, recursive bool) (watch.Interface, error) { rev, err := s.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return nil, err } key = path.Join(s.pathPrefix, key) - return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate) + return s.watcher.Watch(ctx, key, int64(rev), opts.Recursive, opts.ProgressNotify, opts.Predicate) } func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 0361548e0a7..3f55398dd7f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -43,9 +43,6 @@ import ( func TestWatch(t *testing.T) { testWatch(t, false) -} - -func TestWatchList(t *testing.T) { testWatch(t, true) } @@ -94,7 +91,7 @@ func testWatch(t *testing.T, recursive bool) { }, }} for i, tt := range tests { - w, err := store.watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred}, recursive) + w, err := store.Watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred, Recursive: recursive}) if err != nil { t.Fatalf("Watch failed: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 176e52a599f..feda90e9f03 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -183,15 +183,6 @@ type Interface interface { // and send it in an "ADDED" event, before watch starts. Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error) - // WatchList begins watching the specified key's items. Items are decoded into API - // objects and any item selected by 'p' are sent down to returned watch.Interface. - // resourceVersion may be used to specify what version to begin watching, - // which should be the current resourceVersion, and no longer rv+1 - // (e.g. reconnecting without missing any updates). - // If resource version is "0", this interface will list current objects directory defined by key - // and send them in "ADDED" events, before watch starts. - WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error) - // Get unmarshals json found at key into objPtr. On a not found error, will either // return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'. // Treats empty responses and nil response nodes exactly like a not found error. @@ -274,6 +265,9 @@ type ListOptions struct { ResourceVersionMatch metav1.ResourceVersionMatch // Predicate provides the selection rules for the list operation. Predicate SelectionPredicate + // Recursive determines whether the watch is defined for a single object or for the whole set + // of objects. The option is ignored for non-watch requests. + Recursive bool // ProgressNotify determines whether storage-originated bookmark (progress notify) events should // be delivered to the users. The option is ignored for non-watch requests. ProgressNotify bool diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 9381b17ea4e..b859f664cd9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -515,7 +515,7 @@ func TestWatcherTimeout(t *testing.T) { // Create a number of watchers that will not be reading any result. nonReadingWatchers := 50 for i := 0; i < nonReadingWatchers; i++ { - watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) + watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -523,7 +523,7 @@ func TestWatcherTimeout(t *testing.T) { } // Create a second watcher that will be reading result. - readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) + readingWatcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -688,7 +688,7 @@ func TestRandomWatchDeliver(t *testing.T) { } startVersion := strconv.Itoa(int(rv)) - watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) + watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -882,7 +882,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { pred := storage.Everything pred.AllowWatchBookmarks = true ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) - watcher, err := cacher.WatchList(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred}) + watcher, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred, Recursive: true}) if err != nil { t.Fatalf("Unexpected error: %v", err) }