Merge pull request #127500 from p0lyn0mial/upstream-assign-rv-to-watchCacheInterval

cacher: prevents sending events with ResourceVersion < RequiredResourceVersion
This commit is contained in:
Kubernetes Prow Robot 2024-09-23 12:51:59 +01:00 committed by GitHub
commit c9d6fd9ff7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 40 additions and 11 deletions

View File

@ -454,6 +454,13 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
// cacheInterval may be created from a version being more fresh than requested
// (e.g. for NotOlderThan semantic). In such a case, we need to prevent watch event
// with lower resourceVersion from being delivered to ensure watch contract.
if cacheInterval.resourceVersion > resourceVersion {
resourceVersion = cacheInterval.resourceVersion
}
initEventCount := 0
for {
event, err := cacheInterval.Next()

View File

@ -746,7 +746,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string,
indexerFunc := func(i int) *watchCacheEvent {
return w.cache[i%w.capacity]
}
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker())
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, resourceVersion, w.RWMutex.RLocker())
return ci, nil
}

View File

@ -91,6 +91,10 @@ type watchCacheInterval struct {
// lock on each invocation of Next().
buffer *watchCacheIntervalBuffer
// resourceVersion is the resourceVersion from which
// the interval was constructed.
resourceVersion uint64
// lock effectively protects access to the underlying source
// of events through - indexer and indexValidator.
//
@ -103,14 +107,15 @@ type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
type indexerFunc func(int) *watchCacheEvent
type indexValidator func(int) bool
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval {
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, resourceVersion uint64, locker sync.Locker) *watchCacheInterval {
return &watchCacheInterval{
startIndex: startIndex,
endIndex: endIndex,
indexer: indexer,
indexValidator: indexValidator,
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
lock: locker,
startIndex: startIndex,
endIndex: endIndex,
indexer: indexer,
indexValidator: indexValidator,
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
resourceVersion: resourceVersion,
lock: locker,
}
}
@ -172,8 +177,9 @@ func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAt
ci := &watchCacheInterval{
startIndex: 0,
// Simulate that we already have all the events we're looking for.
endIndex: 0,
buffer: buffer,
endIndex: 0,
buffer: buffer,
resourceVersion: resourceVersion,
}
return ci, nil

View File

@ -41,7 +41,7 @@ func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval {
}
indexValidator := func(_ int) bool { return true }
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker)
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, 0, locker)
}
func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer {
@ -300,6 +300,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
wc.endIndex,
indexerFunc,
wc.isIndexValidLocked,
wc.resourceVersion,
&wc.RWMutex,
)

View File

@ -1460,6 +1460,21 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
createdPods = append(createdPods, out)
}
if len(createdPods) > 0 {
// this list call ensures that the cache has seen the created pods.
// this makes the watch request below deterministic.
listObject := &example.PodList{}
opts := storage.ListOptions{
Predicate: storage.Everything,
Recursive: true,
ResourceVersion: createdPods[len(createdPods)-1].ResourceVersion,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
}
err := store.GetList(ctx, fmt.Sprintf("/pods/%s", ns), opts, listObject)
require.NoError(t, err)
require.Len(t, listObject.Items, len(createdPods))
}
if scenario.useCurrentRV {
currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(ctx, store, func() runtime.Object { return &example.PodList{} }, "/pods", "")
require.NoError(t, err)