mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-21 17:48:01 +00:00
Honor starting resourceVersion in watch cache
Compare the requested resourceVersion to each event's resourceVersion to ensure events that occurred in the past are not sent to the client.
This commit is contained in:
parent
1186f4bf85
commit
049e63d253
@ -264,7 +264,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
|
||||
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
|
||||
c.watchers[c.watcherIdx] = watcher
|
||||
c.watcherIdx++
|
||||
return watcher, nil
|
||||
@ -465,7 +465,7 @@ type cacheWatcher struct {
|
||||
forget func(bool)
|
||||
}
|
||||
|
||||
func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
|
||||
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
|
||||
watcher := &cacheWatcher{
|
||||
input: make(chan watchCacheEvent, 10),
|
||||
result: make(chan watch.Event, 10),
|
||||
@ -473,7 +473,7 @@ func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget fun
|
||||
stopped: false,
|
||||
forget: forget,
|
||||
}
|
||||
go watcher.process(initEvents)
|
||||
go watcher.process(initEvents, resourceVersion)
|
||||
return watcher
|
||||
}
|
||||
|
||||
@ -537,7 +537,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
|
||||
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for _, event := range initEvents {
|
||||
@ -550,6 +550,9 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// only send events newer than resourceVersion
|
||||
if event.ResourceVersion > resourceVersion {
|
||||
c.sendWatchCacheEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -346,3 +346,51 @@ func TestFiltering(t *testing.T) {
|
||||
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
||||
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
|
||||
}
|
||||
|
||||
func TestStartingResourceVersion(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher := newTestCacher(etcdStorage)
|
||||
defer cacher.Stop()
|
||||
|
||||
// add 1 object
|
||||
podFoo := makeTestPod("foo")
|
||||
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
||||
|
||||
// Set up Watch starting at fooCreated.ResourceVersion + 10
|
||||
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
rv += 10
|
||||
startVersion := strconv.Itoa(int(rv))
|
||||
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer watcher.Stop()
|
||||
|
||||
lastFoo := fooCreated
|
||||
for i := 0; i < 11; i++ {
|
||||
podFooForUpdate := makeTestPod("foo")
|
||||
podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
|
||||
lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
|
||||
}
|
||||
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
pod := e.Object.(*api.Pod)
|
||||
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// event should have at least rv + 1, since we're starting the watch at rv
|
||||
if podRV <= rv {
|
||||
t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
|
||||
}
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("timed out waiting for event")
|
||||
}
|
||||
}
|
||||
|
@ -45,6 +45,7 @@ type watchCacheEvent struct {
|
||||
Type watch.EventType
|
||||
Object runtime.Object
|
||||
PrevObject runtime.Object
|
||||
ResourceVersion uint64
|
||||
}
|
||||
|
||||
// watchCacheElement is a single "watch event" stored in a cache.
|
||||
@ -179,7 +180,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
||||
if exists {
|
||||
prevObject = previous.(runtime.Object)
|
||||
}
|
||||
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject}
|
||||
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
|
||||
if w.onEvent != nil {
|
||||
w.onEvent(watchCacheEvent)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user