mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 12:32:03 +00:00
cacher: prevent a potential deadlock
waitUntilWatchCacheFreshAndForceAllEvents must be called without a read lock held otherwise the watchcache won't be able to make progress (i.e. the watchCache.processEvent method that requries acquiring an exclusive lock) the deadlock can happen only when the alpha watchlist feature flag is on and the client specifically requests streaming.
This commit is contained in:
parent
c3e7eca7fd
commit
476e407ffd
@ -592,6 +592,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
||||
identifier,
|
||||
)
|
||||
|
||||
// note that c.waitUntilWatchCacheFreshAndForceAllEvents must be called without
|
||||
// the c.watchCache.RLock held otherwise we are at risk of a deadlock
|
||||
// mainly because c.watchCache.processEvent method won't be able to make progress
|
||||
//
|
||||
// moreover even though the c.waitUntilWatchCacheFreshAndForceAllEvents acquires a lock
|
||||
// it is safe to release the lock after the method finishes because we don't require
|
||||
// any atomicity between the call to the method and further calls that actually get the events.
|
||||
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
|
||||
if err != nil {
|
||||
return newErrWatcher(err), nil
|
||||
}
|
||||
|
||||
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||
// on return from this function.
|
||||
@ -599,10 +611,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
||||
// underlying watchCache is calling processEvent under its lock.
|
||||
c.watchCache.RLock()
|
||||
defer c.watchCache.RUnlock()
|
||||
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
|
||||
if err != nil {
|
||||
return newErrWatcher(err), nil
|
||||
}
|
||||
|
||||
startWatchRV := startWatchResourceVersionFn()
|
||||
var cacheInterval *watchCacheInterval
|
||||
if forceAllEvents {
|
||||
|
@ -1811,6 +1811,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
@ -1818,15 +1819,39 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
forceAllEvents, err := cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
|
||||
require.NotNil(t, err, "the target method should return non nil error")
|
||||
require.Equal(t, err.Error(), "Timeout: Too large resource version: 105, current: 100")
|
||||
require.False(t, forceAllEvents, "the target method after returning an error should NOT instruct the caller to ask for all events in the cache (full state)")
|
||||
opts := storage.ListOptions{
|
||||
Predicate: storage.Everything,
|
||||
SendInitialEvents: pointer.Bool(true),
|
||||
ResourceVersion: "105",
|
||||
}
|
||||
opts.Predicate.AllowWatchBookmarks = true
|
||||
|
||||
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
|
||||
require.NoError(t, err, "failed to create watch: %v")
|
||||
defer w.Stop()
|
||||
verifyEvents(t, w, []watch.Event{
|
||||
{
|
||||
Type: watch.Error,
|
||||
Object: &metav1.Status{
|
||||
Status: metav1.StatusFailure,
|
||||
Message: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).Error(),
|
||||
Details: storage.NewTooLargeResourceVersionError(105, 100, resourceVersionTooHighRetrySeconds).(*apierrors.StatusError).Status().Details,
|
||||
Reason: metav1.StatusReasonTimeout,
|
||||
Code: 504,
|
||||
},
|
||||
},
|
||||
}, true)
|
||||
|
||||
go func() {
|
||||
cacher.watchCache.Add(makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}))
|
||||
}()
|
||||
forceAllEvents, err = cacher.waitUntilWatchCacheFreshAndForceAllEvents(context.TODO(), 105, storage.ListOptions{SendInitialEvents: pointer.Bool(true)})
|
||||
require.NoError(t, err)
|
||||
require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)")
|
||||
w, err = cacher.Watch(context.Background(), "pods/ns", opts)
|
||||
require.NoError(t, err, "failed to create watch: %v")
|
||||
defer w.Stop()
|
||||
verifyEvents(t, w, []watch.Event{
|
||||
{
|
||||
Type: watch.Added,
|
||||
Object: makeTestPodDetails("pod1", 105, "node1", map[string]string{"label": "value1"}),
|
||||
},
|
||||
}, true)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user