diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index b0c61e86e14..57d6a01d8bf 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -212,8 +212,8 @@ func TestWatch(t *testing.T) { Action: "create", Node: &etcd.Node{ Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 1, + CreatedIndex: 2, + ModifiedIndex: 2, }, }, event: watch.Added, @@ -225,8 +225,8 @@ func TestWatch(t *testing.T) { Action: "create", Node: &etcd.Node{ Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podBar)), - CreatedIndex: 2, - ModifiedIndex: 2, + CreatedIndex: 3, + ModifiedIndex: 3, }, }, event: watch.Added, @@ -238,13 +238,13 @@ func TestWatch(t *testing.T) { Action: "set", Node: &etcd.Node{ Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 3, + CreatedIndex: 2, + ModifiedIndex: 4, }, PrevNode: &etcd.Node{ Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), - CreatedIndex: 1, - ModifiedIndex: 1, + CreatedIndex: 2, + ModifiedIndex: 2, }, }, event: watch.Modified, @@ -253,7 +253,7 @@ func TestWatch(t *testing.T) { } // Set up Watch for object "podFoo". - watcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything) + watcher, err := cacher.Watch("pods/ns/foo", 2, storage.Everything) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -276,13 +276,13 @@ func TestWatch(t *testing.T) { } // Check whether we get too-old error. - _, err = cacher.Watch("pods/ns/foo", 0, storage.Everything) + _, err = cacher.Watch("pods/ns/foo", 1, storage.Everything) if err == nil { - t.Errorf("expected 'error too old' error") + t.Errorf("exepcted 'error too old' error") } // Now test watch with initial state. - initialWatcher, err := cacher.Watch("pods/ns/foo", 1, storage.Everything) + initialWatcher, err := cacher.Watch("pods/ns/foo", 2, storage.Everything) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -302,6 +302,39 @@ func TestWatch(t *testing.T) { } } + // Now test watch from "now". + nowWatcher, err := cacher.Watch("pods/ns/foo", 0, storage.Everything) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + select { + case event := <-nowWatcher.ResultChan(): + if obj := event.Object.(*api.Pod); event.Type != watch.Added || obj.ResourceVersion != "4" { + t.Errorf("unexpected event: %v", event) + } + case <-time.After(time.Millisecond * 100): + t.Errorf("timed out waiting for an event") + } + // Emit a new event and check if it is observed by the watcher. + fakeClient.WatchResponse <- &etcd.Response{ + Action: "set", + Node: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), + CreatedIndex: 2, + ModifiedIndex: 5, + }, + PrevNode: &etcd.Node{ + Value: string(runtime.EncodeOrDie(testapi.Default.Codec(), podFoo)), + CreatedIndex: 2, + ModifiedIndex: 4, + }, + } + event := <-nowWatcher.ResultChan() + obj := event.Object.(*api.Pod) + if event.Type != watch.Modified || obj.ResourceVersion != "5" { + t.Errorf("unexpected event: %v", event) + } + close(fakeClient.WatchResponse) } diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 52f04eeddd8..5535d41d417 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -250,6 +250,20 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa if size > 0 { oldest = w.cache[w.startIndex%w.capacity].resourceVersion } + if resourceVersion == 0 { + // resourceVersion = 0 means that we don't require any specific starting point + // and we would like to start watching from ~now. + // However, to keep backward compatibility, we additionally need to return the + // current state and only then start watching from that point. + // + // TODO: In v2 api, we should stop returning the current state - #13969. + allItems := w.store.List() + result := make([]watchCacheEvent, len(allItems)) + for i, item := range allItems { + result[i] = watchCacheEvent{Type: watch.Added, Object: item.(runtime.Object)} + } + return result, nil + } if resourceVersion < oldest { return nil, fmt.Errorf("too old resource version: %d (%d)", resourceVersion, oldest) }