Merge pull request #106528 from wojtek-t/cleanup_storage_watch_interface

Simplify storage.Interface by merging Watch and WatchList functions.
This commit is contained in:
Kubernetes Prow Robot 2021-12-07 18:26:01 -08:00 committed by GitHub
commit 9dc33fa22f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 18 additions and 51 deletions

View File

@ -60,10 +60,6 @@ func (s *DryRunnableStorage) Watch(ctx context.Context, key string, opts storage
return s.Storage.Watch(ctx, key, opts)
}
func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.Storage.WatchList(ctx, key, opts)
}
func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
return s.Storage.Get(ctx, key, opts, objPtr)
}

View File

@ -1240,23 +1240,19 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
// WatchPredicate starts a watch for the items that matches.
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p}
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true}
key := e.KeyRootFunc(ctx)
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
w, err := e.Storage.Watch(ctx, key, storageOpts)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(ctx, w, e.Decorator), nil
}
return w, nil
if k, err := e.KeyFunc(ctx, name); err == nil {
key = k
storageOpts.Recursive = false
}
// if we cannot extract a key based on the current context, the
// optimization is skipped
}
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), storageOpts)
w, err := e.Storage.Watch(ctx, key, storageOpts)
if err != nil {
return nil, err
}

View File

@ -534,11 +534,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return watcher, nil
}
// WatchList implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return c.Watch(ctx, key, opts)
}
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
if opts.ResourceVersion == "" {
@ -1136,11 +1131,12 @@ func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interfac
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
}
if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) {
opts.ProgressNotify = true
}
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts)
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
}
// errWatcher implements watch.Interface to return a single error

View File

@ -308,9 +308,6 @@ func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *
func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
return newDummyWatch(), nil
}
func (d *dummyStorage) WatchList(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
return newDummyWatch(), nil
}
func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ runtime.Object) error {
return d.err
}

View File

@ -862,21 +862,12 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.watch(ctx, key, opts, false)
}
// WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.watch(ctx, key, opts, true)
}
func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, recursive bool) (watch.Interface, error) {
rev, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate)
return s.watcher.Watch(ctx, key, int64(rev), opts.Recursive, opts.ProgressNotify, opts.Predicate)
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {

View File

@ -43,9 +43,6 @@ import (
func TestWatch(t *testing.T) {
testWatch(t, false)
}
func TestWatchList(t *testing.T) {
testWatch(t, true)
}
@ -94,7 +91,7 @@ func testWatch(t *testing.T, recursive bool) {
},
}}
for i, tt := range tests {
w, err := store.watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred}, recursive)
w, err := store.Watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred, Recursive: recursive})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}

View File

@ -183,15 +183,6 @@ type Interface interface {
// and send it in an "ADDED" event, before watch starts.
Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will list current objects directory defined by key
// and send them in "ADDED" events, before watch starts.
WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error.
@ -274,6 +265,9 @@ type ListOptions struct {
ResourceVersionMatch metav1.ResourceVersionMatch
// Predicate provides the selection rules for the list operation.
Predicate SelectionPredicate
// Recursive determines whether the watch is defined for a single object or for the whole set
// of objects. The option is ignored for non-watch requests.
Recursive bool
// ProgressNotify determines whether storage-originated bookmark (progress notify) events should
// be delivered to the users. The option is ignored for non-watch requests.
ProgressNotify bool

View File

@ -515,7 +515,7 @@ func TestWatcherTimeout(t *testing.T) {
// Create a number of watchers that will not be reading any result.
nonReadingWatchers := 50
for i := 0; i < nonReadingWatchers; i++ {
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -523,7 +523,7 @@ func TestWatcherTimeout(t *testing.T) {
}
// Create a second watcher that will be reading result.
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
readingWatcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -688,7 +688,7 @@ func TestRandomWatchDeliver(t *testing.T) {
}
startVersion := strconv.Itoa(int(rv))
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -882,7 +882,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
watcher, err := cacher.WatchList(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
watcher, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred, Recursive: true})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}