diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 33c404633c7..c5f27b19ced 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -592,6 +592,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions identifier, ) + // note that c.waitUntilWatchCacheFreshAndForceAllEvents must be called without + // the c.watchCache.RLock held otherwise we are at risk of a deadlock + // mainly because c.watchCache.processEvent method won't be able to make progress + // + // moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock + // it is safe to release the lock after the method finishes because we don't require + // any atomicity between the call to the method and further calls that actually get the events. + forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) + if err != nil { + return newErrWatcher(err), nil + } + // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked // on return from this function. @@ -599,10 +611,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) - if err != nil { - return newErrWatcher(err), nil - } + startWatchRV := startWatchResourceVersionFn() var cacheInterval *watchCacheInterval if forceAllEvents { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 7847c1eaa4f..0624d5e78e9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1823,6 +1823,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) { } func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) if err != nil { @@ -1830,17 +1831,41 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { } defer cacher.Stop() - forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) - require.NotNil(t, err, "the target method should return non nil error") - require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100") - require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)") + opts := storage.ListOptions{ + Predicate: storage.Everything, + SendInitialEvents: pointer.Bool(true), + ResourceVersion: "105", + } + opts.Predicate.AllowWatchBookmarks = true + + w, err := cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Error, + Object: &metav1.Status{ + Status: metav1.StatusFailure, + Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(), + Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details, + Reason: metav1.StatusReasonTimeout, + Code: 504, + }, + }, + }, true) go func() { cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) }() - forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) - require.NoError(t, err) - require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)") + w, err = cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Added, + Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}), + }, + }, true) } type fakeStorage struct {