Add SendInitialEvents to storage ListOptions

This commit is contained in:
Lukasz Szaszkiewicz 2023-01-30 14:54:59 +01:00
parent 4b3e0a39e5
commit e1537fc41b
3 changed files with 11 additions and 5 deletions

View File

@ -1283,12 +1283,12 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
resourceVersion = options.ResourceVersion
predicate.AllowWatchBookmarks = options.AllowWatchBookmarks
}
return e.WatchPredicate(ctx, predicate, resourceVersion)
return e.WatchPredicate(ctx, predicate, resourceVersion, options.SendInitialEvents)
}
// WatchPredicate starts a watch for the items that matches.
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true}
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string, sendInitialEvents *bool) (watch.Interface, error) {
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p, Recursive: true, SendInitialEvents: sendInitialEvents}
// if we're not already namespace-scoped, see if the field selector narrows the scope of the watch
if requestNamespace, _ := genericapirequest.NamespaceFrom(ctx); len(requestNamespace) == 0 {

View File

@ -2215,7 +2215,7 @@ func TestStoreDeleteCollectionWithWatch(t *testing.T) {
}
podCreated := objCreated.(*example.Pod)
watcher, err := registry.WatchPredicate(testContext, matchPodName("foo"), podCreated.ResourceVersion)
watcher, err := registry.WatchPredicate(testContext, matchPodName("foo"), podCreated.ResourceVersion, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -2276,7 +2276,7 @@ func TestStoreWatch(t *testing.T) {
destroyFunc, registry := NewTestGenericStoreRegistry(t)
defer destroyFunc()
wi, err := registry.WatchPredicate(ctx, m.selectPred, "0")
wi, err := registry.WatchPredicate(ctx, m.selectPred, "0", nil)
if err != nil {
t.Errorf("%v: unexpected error: %v", name, err)
} else {

View File

@ -268,4 +268,10 @@ type ListOptions struct {
// ProgressNotify determines whether storage-originated bookmark (progress notify) events should
// be delivered to the users. The option is ignored for non-watch requests.
ProgressNotify bool
// SendInitialEvents, when set together with Watch option,
// begin the watch stream with synthetic init events to build the
// whole state of all resources followed by a synthetic "Bookmark"
// event containing a ResourceVersion after which the server
// continues streaming events.
SendInitialEvents *bool
}