diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 013fcf81e3b..cc8f715c700 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -390,6 +390,12 @@ func TestWatchSemanticInitialEventsExtended(t *testing.T) { storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store) } +func TestWatchListMatchSingle(t *testing.T) { + store, terminate := testSetupWithEtcdAndCreateWrapper(t) + t.Cleanup(terminate) + storagetesting.RunWatchListMatchSingle(context.TODO(), t, store) +} + // =================================================== // Test-setup related function are following. // =================================================== diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 152c27daafd..04a2617d640 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -696,6 +696,7 @@ func (w *watchCache) isIndexValidLocked(index int) bool { // be called under the watchCache lock. func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string, opts storage.ListOptions) (*watchCacheInterval, error) { _, matchesSingle := opts.Predicate.MatchesSingle() + matchesSingle = matchesSingle && !opts.Recursive if opts.SendInitialEvents != nil && *opts.SendInitialEvents { return w.getIntervalFromStoreLocked(key, matchesSingle) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index b8c046d22cd..1d809ff6f44 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -144,6 +144,11 @@ func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) { storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store) } +func TestWatchListMatchSingle(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunWatchListMatchSingle(ctx, t, store) +} + // ======================================================================= // Implementation-specific tests are following. // The following tests are exercising the details of the implementation diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index dee88a0e61d..89672d80624 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -1571,6 +1571,61 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st testCheckNoMoreResults(t, w) } +func RunWatchListMatchSingle(ctx context.Context, t *testing.T, store storage.Interface) { + trueVal := true + expectedInitialEventsInStrictOrder := func(initialPod *example.Pod, globalResourceVersion string) []watch.Event { + watchEvents := []watch.Event{} + watchEvents = append(watchEvents, watch.Event{Type: watch.Added, Object: initialPod}) + watchEvents = append(watchEvents, watch.Event{Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: globalResourceVersion, + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + }, + }}) + return watchEvents + } + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true) + + // add the pod for which the field selector will be constructed + ns := "ns-foo" + expectedPod := &example.Pod{} + initialPod := makePod("1") + initialPod.Namespace = ns + err := store.Create(ctx, computePodKey(initialPod), initialPod, expectedPod, 0) + require.NoError(t, err, "failed to add a pod: %v") + + // add more pods that won't match the field selector + lastAddedPod := &example.Pod{} + for _, otherPod := range []*example.Pod{makePod("2"), makePod("3"), makePod("4"), makePod("5")} { + otherPod.Namespace = ns + err = store.Create(ctx, computePodKey(otherPod), otherPod, lastAddedPod, 0) + require.NoError(t, err, "failed to add a pod: %v") + } + + opts := storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name=pod-1"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + }, + Recursive: true, + } + opts.SendInitialEvents = &trueVal + opts.Predicate.AllowWatchBookmarks = true + + w, err := store.Watch(context.Background(), "/pods", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + + // make sure we only get a single pod matching the field selector + // followed by the bookmark with the global RV + testCheckResultsInStrictOrder(t, w, expectedInitialEventsInStrictOrder(expectedPod, lastAddedPod.ResourceVersion)) + testCheckNoMoreResults(t, w) +} + func makePod(namePrefix string) *example.Pod { return &example.Pod{ ObjectMeta: metav1.ObjectMeta{