From d31ff83fdc770e9a0ce42cab8f049722ac765720 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 21 Dec 2016 12:05:30 +0100 Subject: [PATCH] Fix bug of delivering random parts of events --- pkg/storage/cacher.go | 20 ++++++++++++++++++- pkg/storage/cacher_test.go | 41 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index f709abb1e19..ceaa203309b 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -859,9 +859,27 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { case !curObjPasses && oldObjPasses: watchEvent = watch.Event{Type: watch.Deleted, Object: object} } + + // We need to ensure that if we put event X to the c.result, all + // previous events were already put into it before, no matter whether + // c.done is close or not. + // Thus we cannot simply select from c.done and c.result and this + // would give us non-determinism. + // At the same time, we don't want to block infinitely on putting + // to c.result, when c.done is already closed. + + // This ensures that with c.done already close, we at most once go + // into the next select after this. With that, no matter which + // statement we choose there, we will deliver only consecutive + // events. + select { + case <-c.done: + return + default: + } + select { case c.result <- watchEvent: - // don't block on c.result if c.done is closed case <-c.done: } } diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 34938ca3524..b917bd337cd 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -490,3 +490,44 @@ func TestStartingResourceVersion(t *testing.T) { t.Errorf("timed out waiting for event") } } + +func TestRandomWatchDeliver(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage, 10) + defer cacher.Stop() + + fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) + rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + startVersion := strconv.Itoa(int(rv)) + + watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Now we can create exactly 21 events that should be delivered + // to the watcher, before it will completely block cacher and as + // a result will be dropped. + for i := 0; i < 21; i++ { + updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-%d", i)), nil) + } + + // Now stop the watcher and check if the consecutive events are being delivered. + watcher.Stop() + + watched := 0 + for { + event, ok := <-watcher.ResultChan() + if !ok { + break + } + if a, e := event.Object.(*api.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a { + t.Errorf("Unexpected object watched: %s, expected %s", a, e) + } + watched++ + } +}