From 476e407ffd2ab393840d3f7a9fd01b71698738a3 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 6 Apr 2023 09:54:02 +0200 Subject: [PATCH] cacher: prevent a potential deadlock waitUntilWatchCacheFreshAndForceAllEvents must be called without a read lock held otherwise the watchcache won't be able to make progress (i.e. the watchCache.processEvent method that requries acquiring an exclusive lock) the deadlock can happen only when the alpha watchlist feature flag is on and the client specifically requests streaming. --- .../apiserver/pkg/storage/cacher/cacher.go | 17 ++++++-- .../storage/cacher/cacher_whitebox_test.go | 39 +++++++++++++++---- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index eada35b1d0a..6e20f52c5a1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -592,6 +592,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions identifier, ) + // note that c.waitUntilWatchCacheFreshAndForceAllEvents must be called without + // the c.watchCache.RLock held otherwise we are at risk of a deadlock + // mainly because c.watchCache.processEvent method won't be able to make progress + // + // moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock + // it is safe to release the lock after the method finishes because we don't require + // any atomicity between the call to the method and further calls that actually get the events. + forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) + if err != nil { + return newErrWatcher(err), nil + } + // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked // on return from this function. @@ -599,10 +611,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts) - if err != nil { - return newErrWatcher(err), nil - } + startWatchRV := startWatchResourceVersionFn() var cacheInterval *watchCacheInterval if forceAllEvents { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 152d92b9aea..b7885172340 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1811,6 +1811,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) { } func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) if err != nil { @@ -1818,15 +1819,39 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { } defer cacher.Stop() - forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) - require.NotNil(t, err, "the target method should return non nil error") - require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100") - require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)") + opts := storage.ListOptions{ + Predicate: storage.Everything, + SendInitialEvents: pointer.Bool(true), + ResourceVersion: "105", + } + opts.Predicate.AllowWatchBookmarks = true + + w, err := cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Error, + Object: &metav1.Status{ + Status: metav1.StatusFailure, + Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(), + Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details, + Reason: metav1.StatusReasonTimeout, + Code: 504, + }, + }, + }, true) go func() { cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"})) }() - forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)}) - require.NoError(t, err) - require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)") + w, err = cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + verifyEvents(t, w, []watch.Event{ + { + Type: watch.Added, + Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}), + }, + }, true) }