From 793da62c7f7abbf52e03b5823d4e5250baf5adff Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 4 Dec 2015 09:58:24 +0100 Subject: [PATCH] Change resourceVersion to string in storage.Interface --- pkg/registry/generic/etcd/etcd.go | 14 +++--------- pkg/storage/cacher.go | 32 ++++++++++++++++----------- pkg/storage/cacher_test.go | 20 ++++++++++------- pkg/storage/etcd/etcd_helper.go | 18 ++++++++++----- pkg/storage/etcd/etcd_helper_test.go | 6 ++--- pkg/storage/etcd/etcd_watcher_test.go | 12 +++++----- pkg/storage/interfaces.go | 6 ++--- pkg/storage/util.go | 14 ++++++++++-- pkg/storage/util_test.go | 3 +-- test/integration/etcd_tools_test.go | 2 +- 10 files changed, 73 insertions(+), 54 deletions(-) diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index faf60083068..1c5105b19de 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -188,11 +188,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *unvers if options == nil { options = &unversioned.ListOptions{ResourceVersion: "0"} } - version, err := storage.ParseWatchResourceVersion(options.ResourceVersion, e.EndpointName) - if err != nil { - return nil, err - } - err = e.Storage.List(ctx, e.KeyRootFunc(ctx), version, filterFunc, list) + err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, filterFunc, list) return list, etcderr.InterpretListError(err, e.EndpointName) } @@ -479,10 +475,6 @@ func (e *Etcd) Watch(ctx api.Context, options *unversioned.ListOptions) (watch.I // WatchPredicate starts a watch for the items that m matches. func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { - version, err := storage.ParseWatchResourceVersion(resourceVersion, e.EndpointName) - if err != nil { - return nil, err - } filterFunc := e.filterAndDecorateFunction(m) if name, ok := m.MatchesSingle(); ok { @@ -490,12 +482,12 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio if err != nil { return nil, err } - return e.Storage.Watch(ctx, key, version, filterFunc) + return e.Storage.Watch(ctx, key, resourceVersion, filterFunc) } // if we cannot extract a key based on the current context, the optimization is skipped } - return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), version, filterFunc) + return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc) } func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool { diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 903ba5e0bf6..f49d5d1f304 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -223,7 +223,12 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object) err } // Implements storage.Interface. -func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { +func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) { + watchRV, err := ParseWatchResourceVersion(resourceVersion) + if err != nil { + return nil, err + } + // Do NOT allow Watch to start when the underlying structures are not propagated. c.usable.RLock() defer c.usable.RUnlock() @@ -235,7 +240,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64, // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(resourceVersion) + initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) if err != nil { return nil, err } @@ -249,7 +254,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64, } // Implements storage.Interface. -func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { +func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) { return c.Watch(ctx, key, resourceVersion, filter) } @@ -264,11 +269,16 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l } // Implements storage.Interface. -func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error { +func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error { if !c.ListFromCache { return c.storage.List(ctx, key, resourceVersion, filter, listObj) } + listRV, err := ParseListResourceVersion(resourceVersion) + if err != nil { + return err + } + // To avoid situation when List is proceesed before the underlying // watchCache is propagated for the first time, we acquire and immediately // release the 'usable' lock. @@ -277,7 +287,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f c.usable.RLock() c.usable.RUnlock() - // List elements from cache, with at least 'resourceVersion'. + // List elements from cache, with at least 'listRV'. listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return err @@ -288,7 +298,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f } filterFunc := filterFunction(key, c.keyFunc, filter) - objs, resourceVersion := c.watchCache.WaitUntilFreshAndList(resourceVersion) + objs, readResourceVersion := c.watchCache.WaitUntilFreshAndList(listRV) for _, obj := range objs { object, ok := obj.(runtime.Object) if !ok { @@ -299,7 +309,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f } } if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, resourceVersion); err != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil { return err } } @@ -389,7 +399,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) List() (runtime.Object, error) { list := lw.newListFunc() - if err := lw.storage.List(context.TODO(), lw.resourcePrefix, 0, Everything, list); err != nil { + if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil { return nil, err } return list, nil @@ -397,11 +407,7 @@ func (lw *cacherListerWatcher) List() (runtime.Object, error) { // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) Watch(options unversioned.ListOptions) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(options.ResourceVersion, lw.resourcePrefix) - if err != nil { - return nil, err - } - return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, version, Everything) + return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything) } // cacherWatch implements watch.Interface diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 3c09d1e1e4b..402a2004f08 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -114,7 +114,7 @@ func TestList(t *testing.T) { result := &api.PodList{} // TODO: We need to pass ResourceVersion of barPod deletion operation. // However, there is no easy way to get it, so it is hardcoded to 8. - if err := cacher.List(context.TODO(), "pods/ns", 8, storage.Everything, result); err != nil { + if err := cacher.List(context.TODO(), "pods/ns", "8", storage.Everything, result); err != nil { t.Errorf("Unexpected error: %v", err) } if result.ListMeta.ResourceVersion != "8" { @@ -179,7 +179,7 @@ func TestWatch(t *testing.T) { podFooBis.Spec.NodeName = "anotherFakeNode" // Set up Watch for object "podFoo". - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -192,17 +192,19 @@ func TestWatch(t *testing.T) { verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) // Check whether we get too-old error. - _, err = cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything) + _, err = cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything) if err == nil { t.Errorf("Expected 'error too old' error") } // Now test watch with initial state. + // We want to observe fooCreation too, so need to pass smaller resource version. initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion) if err != nil { t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion) } - initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), storage.Everything) + initialVersion-- + initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -211,7 +213,7 @@ func TestWatch(t *testing.T) { verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) // Now test watch from "now". - nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 0, storage.Everything) + nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -229,14 +231,14 @@ func TestWatcherTimeout(t *testing.T) { cacher := newTestCacher(etcdStorage) // Create a watcher that will not be reading any result. - watcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything) + watcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer watcher.Stop() // Create a second watcher that will be reading result. - readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything) + readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -281,11 +283,13 @@ func TestFiltering(t *testing.T) { } return selector.Matches(labels.Set(metadata.Labels())) } + // We want to observe fooCreation too, so need to pass smaller resource version. initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion) if err != nil { t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion) } - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), filter) + initialVersion-- + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), filter) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 9ae8cf9a9c3..d82d644a36a 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -191,24 +191,32 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) } // Implements storage.Interface. -func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { +func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { if ctx == nil { glog.Errorf("Context is nil") } + watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) + if err != nil { + return nil, err + } key = h.prefixEtcdKey(key) w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h) - go w.etcdWatch(h.client, key, resourceVersion) + go w.etcdWatch(h.client, key, watchRV) return w, nil } // Implements storage.Interface. -func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { +func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { if ctx == nil { glog.Errorf("Context is nil") } + watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) + if err != nil { + return nil, err + } key = h.prefixEtcdKey(key) w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h) - go w.etcdWatch(h.client, key, resourceVersion) + go w.etcdWatch(h.client, key, watchRV) return w, nil } @@ -352,7 +360,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun } // Implements storage.Interface. -func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc, listObj runtime.Object) error { +func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error { if ctx == nil { glog.Errorf("Context is nil") } diff --git a/pkg/storage/etcd/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go index 41319b2dd84..971adfd0f76 100644 --- a/pkg/storage/etcd/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -117,7 +117,7 @@ func TestList(t *testing.T) { var got api.PodList // TODO: a sorted filter function could be applied such implied // ordering on the returned list doesn't matter. - err := helper.List(context.TODO(), key, 0, storage.Everything, &got) + err := helper.List(context.TODO(), key, "", storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -156,7 +156,7 @@ func TestListFiltered(t *testing.T) { } var got api.PodList - err := helper.List(context.TODO(), key, 0, filter, &got) + err := helper.List(context.TODO(), key, "", filter, &got) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -206,7 +206,7 @@ func TestListAcrossDirectories(t *testing.T) { list.Items[2] = *returnedObj var got api.PodList - err := roothelper.List(context.TODO(), rootkey, 0, storage.Everything, &got) + err := roothelper.List(context.TODO(), rootkey, "", storage.Everything, &got) if err != nil { t.Errorf("Unexpected error %v", err) } diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index c71381e6919..561998bac22 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -243,7 +243,7 @@ func TestWatch(t *testing.T) { key := "/some/key" h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -289,7 +289,7 @@ func TestWatchEtcdState(t *testing.T) { defer server.Terminate(t) h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -353,7 +353,7 @@ func TestWatchFromZeroIndex(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -388,7 +388,7 @@ func TestWatchListFromZeroIndex(t *testing.T) { defer server.Terminate(t) h := newEtcdHelper(server.Client, codec, key) - watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything) + watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -420,7 +420,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) { defer server.Terminate(t) h := newEtcdHelper(server.Client, codec, key) - watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything) + watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -451,7 +451,7 @@ func TestWatchPurposefulShutdown(t *testing.T) { h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix()) // Test purposeful shutdown - watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) + watching, err := h.Watch(context.TODO(), key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go index 79b5fa67cca..acddffcf91e 100644 --- a/pkg/storage/interfaces.go +++ b/pkg/storage/interfaces.go @@ -97,13 +97,13 @@ type Interface interface { // and any items passing 'filter' are sent down to returned watch.Interface. // resourceVersion may be used to specify what version to begin watching // (e.g. reconnecting without missing any updates). - Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) + Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) // WatchList begins watching the specified key's items. Items are decoded into API // objects and any item passing 'filter' are sent down to returned watch.Interface. // resourceVersion may be used to specify what version to begin watching // (e.g. reconnecting without missing any updates). - WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) + WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) // Get unmarshals json found at key into objPtr. On a not found error, will either // return a zero object of the requested type, or an error, depending on ignoreNotFound. @@ -118,7 +118,7 @@ type Interface interface { // into *List api object (an object that satisfies runtime.IsList definition). // The returned contents may be delayed, but it is guaranteed that they will // be have at least 'resourceVersion'. - List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error + List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // retrying the update until success if there is index conflict. diff --git a/pkg/storage/util.go b/pkg/storage/util.go index 0b8fe7a3b0e..2f24cf276ae 100644 --- a/pkg/storage/util.go +++ b/pkg/storage/util.go @@ -41,13 +41,13 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc { // the etcd version we should pass to helper.Watch(). Because resourceVersion is // an opaque value, the default watch behavior for non-zero watch is to watch // the next value (if you pass "1", you will see updates from "2" onwards). -func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { +func ParseWatchResourceVersion(resourceVersion string) (uint64, error) { if resourceVersion == "" || resourceVersion == "0" { return 0, nil } version, err := strconv.ParseUint(resourceVersion, 10, 64) if err != nil { - return 0, errors.NewInvalid(kind, "", utilvalidation.ErrorList{ + return 0, errors.NewInvalid("", "", utilvalidation.ErrorList{ // Validation errors are supposed to return version-specific field // paths, but this is probably close enough. utilvalidation.NewInvalidError(utilvalidation.NewFieldPath("resourceVersion"), resourceVersion, err.Error()), @@ -56,6 +56,16 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { return version + 1, nil } +// ParseListResourceVersion takes a resource version argument and converts it to +// the etcd version. +func ParseListResourceVersion(resourceVersion string) (uint64, error) { + if resourceVersion == "" { + return 0, nil + } + version, err := strconv.ParseUint(resourceVersion, 10, 64) + return version, err +} + func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { meta, err := meta.Accessor(obj) if err != nil { diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 1de4f106793..22768ce1afa 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -25,7 +25,6 @@ import ( func TestEtcdParseWatchResourceVersion(t *testing.T) { testCases := []struct { Version string - Kind string ExpectVersion uint64 Err bool }{ @@ -36,7 +35,7 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) { {Version: "10", ExpectVersion: 11}, } for _, testCase := range testCases { - version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind) + version, err := ParseWatchResourceVersion(testCase.Version) switch { case testCase.Err: if err == nil { diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 180ae6cb73f..ca61850c433 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -151,7 +151,7 @@ func TestWatch(t *testing.T) { expectedVersion := resp.Node.ModifiedIndex // watch should load the object at the current index - w, err := etcdStorage.Watch(ctx, key, 0, storage.Everything) + w, err := etcdStorage.Watch(ctx, key, "0", storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) }