diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 57617e36f41..f246085545b 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -264,7 +264,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.Lock() defer c.Unlock() - watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) + watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx)) c.watchers[c.watcherIdx] = watcher c.watcherIdx++ return watcher, nil @@ -465,7 +465,7 @@ type cacheWatcher struct { forget func(bool) } -func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher { watcher := &cacheWatcher{ input: make(chan watchCacheEvent, 10), result: make(chan watch.Event, 10), @@ -473,7 +473,7 @@ func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget fun stopped: false, forget: forget, } - go watcher.process(initEvents) + go watcher.process(initEvents, resourceVersion) return watcher } @@ -537,7 +537,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) { } } -func (c *cacheWatcher) process(initEvents []watchCacheEvent) { +func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) { defer utilruntime.HandleCrash() for _, event := range initEvents { @@ -550,6 +550,9 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent) { if !ok { return } - c.sendWatchCacheEvent(event) + // only send events newer than resourceVersion + if event.ResourceVersion > resourceVersion { + c.sendWatchCacheEvent(event) + } } } diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 35cef406de5..ddbba037807 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -346,3 +346,51 @@ func TestFiltering(t *testing.T) { verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime) } + +func TestStartingResourceVersion(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage) + defer cacher.Stop() + + // add 1 object + podFoo := makeTestPod("foo") + fooCreated := updatePod(t, etcdStorage, podFoo, nil) + + // Set up Watch starting at fooCreated.ResourceVersion + 10 + rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + rv += 10 + startVersion := strconv.Itoa(int(rv)) + + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + + lastFoo := fooCreated + for i := 0; i < 11; i++ { + podFooForUpdate := makeTestPod("foo") + podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)} + lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo) + } + + select { + case e := <-watcher.ResultChan(): + pod := e.Object.(*api.Pod) + podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // event should have at least rv + 1, since we're starting the watch at rv + if podRV <= rv { + t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for event") + } +} diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 87bce0c0e6a..3b34479fa2a 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -42,9 +42,10 @@ const ( // the previous value of the object to enable proper filtering in the // upper layers. type watchCacheEvent struct { - Type watch.EventType - Object runtime.Object - PrevObject runtime.Object + Type watch.EventType + Object runtime.Object + PrevObject runtime.Object + ResourceVersion uint64 } // watchCacheElement is a single "watch event" stored in a cache. @@ -179,7 +180,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd if exists { prevObject = previous.(runtime.Object) } - watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject} + watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion} if w.onEvent != nil { w.onEvent(watchCacheEvent) }