From e1537fc41b9eecb0e5026bfc72b27e4a07d771ba Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 30 Jan 2023 14:54:59 +0100 Subject: [PATCH] Add SendInitialEvents to storage ListOptions --- .../k8s.io/apiserver/pkg/registry/generic/registry/store.go | 6 +++--- .../apiserver/pkg/registry/generic/registry/store_test.go | 4 ++-- staging/src/k8s.io/apiserver/pkg/storage/interfaces.go | 6 ++++++ 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 7fa412ef3e2..fa23d29d6c9 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1283,12 +1283,12 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti resourceVersion = options.ResourceVersion predicate.AllowWatchBookmarks = options.AllowWatchBookmarks } - return e.WatchPredicate(ctx, predicate, resourceVersion) + return e.WatchPredicate(ctx, predicate, resourceVersion, options.SendInitialEvents) } // WatchPredicate starts a watch for the items that matches. -func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) { - storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true} +func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string, sendInitialEvents *bool) (watch.Interface, error) { + storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true, SendInitialEvents: sendInitialEvents} // if we're not already namespace-scoped, see if the field selector narrows the scope of the watch if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index 28fe2877cfe..390f794065e 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -2215,7 +2215,7 @@ func TestStoreDeleteCollectionWithWatch(t *testing.T) { } podCreated := objCreated.(*example.Pod) - watcher, err := registry.WatchPredicate(testContext, matchPodName("foo"), podCreated.ResourceVersion) + watcher, err := registry.WatchPredicate(testContext, matchPodName("foo"), podCreated.ResourceVersion, nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -2276,7 +2276,7 @@ func TestStoreWatch(t *testing.T) { destroyFunc, registry := NewTestGenericStoreRegistry(t) defer destroyFunc() - wi, err := registry.WatchPredicate(ctx, m.selectPred, "0") + wi, err := registry.WatchPredicate(ctx, m.selectPred, "0", nil) if err != nil { t.Errorf("%v: unexpected error: %v", name, err) } else { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 812aa412bb3..daf30a242f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -268,4 +268,10 @@ type ListOptions struct { // ProgressNotify determines whether storage-originated bookmark (progress notify) events should // be delivered to the users. The option is ignored for non-watch requests. ProgressNotify bool + // SendInitialEvents, when set together with Watch option, + // begin the watch stream with synthetic init events to build the + // whole state of all resources followed by a synthetic "Bookmark" + // event containing a ResourceVersion after which the server + // continues streaming events. + SendInitialEvents *bool }