mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Merge pull request #18207 from wojtek-t/string_resource_version
Change resourceVersion to string in storage.Interface
This commit is contained in:
commit
0369805308
@ -188,11 +188,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *unvers
|
|||||||
if options == nil {
|
if options == nil {
|
||||||
options = &unversioned.ListOptions{ResourceVersion: "0"}
|
options = &unversioned.ListOptions{ResourceVersion: "0"}
|
||||||
}
|
}
|
||||||
version, err := storage.ParseWatchResourceVersion(options.ResourceVersion, e.EndpointName)
|
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, filterFunc, list)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = e.Storage.List(ctx, e.KeyRootFunc(ctx), version, filterFunc, list)
|
|
||||||
return list, etcderr.InterpretListError(err, e.EndpointName)
|
return list, etcderr.InterpretListError(err, e.EndpointName)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -479,10 +475,6 @@ func (e *Etcd) Watch(ctx api.Context, options *unversioned.ListOptions) (watch.I
|
|||||||
|
|
||||||
// WatchPredicate starts a watch for the items that m matches.
|
// WatchPredicate starts a watch for the items that m matches.
|
||||||
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
|
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
|
||||||
version, err := storage.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
filterFunc := e.filterAndDecorateFunction(m)
|
filterFunc := e.filterAndDecorateFunction(m)
|
||||||
|
|
||||||
if name, ok := m.MatchesSingle(); ok {
|
if name, ok := m.MatchesSingle(); ok {
|
||||||
@ -490,12 +482,12 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return e.Storage.Watch(ctx, key, version, filterFunc)
|
return e.Storage.Watch(ctx, key, resourceVersion, filterFunc)
|
||||||
}
|
}
|
||||||
// if we cannot extract a key based on the current context, the optimization is skipped
|
// if we cannot extract a key based on the current context, the optimization is skipped
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), version, filterFunc)
|
return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool {
|
func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool {
|
||||||
|
@ -223,7 +223,12 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
|
||||||
|
watchRV, err := ParseWatchResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// Do NOT allow Watch to start when the underlying structures are not propagated.
|
// Do NOT allow Watch to start when the underlying structures are not propagated.
|
||||||
c.usable.RLock()
|
c.usable.RLock()
|
||||||
defer c.usable.RUnlock()
|
defer c.usable.RUnlock()
|
||||||
@ -235,7 +240,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64,
|
|||||||
// underlying watchCache is calling processEvent under its lock.
|
// underlying watchCache is calling processEvent under its lock.
|
||||||
c.watchCache.RLock()
|
c.watchCache.RLock()
|
||||||
defer c.watchCache.RUnlock()
|
defer c.watchCache.RUnlock()
|
||||||
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(resourceVersion)
|
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -249,7 +254,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
|
||||||
return c.Watch(ctx, key, resourceVersion, filter)
|
return c.Watch(ctx, key, resourceVersion, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -264,11 +269,16 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error {
|
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {
|
||||||
if !c.ListFromCache {
|
if !c.ListFromCache {
|
||||||
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
|
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
listRV, err := ParseListResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// To avoid situation when List is proceesed before the underlying
|
// To avoid situation when List is proceesed before the underlying
|
||||||
// watchCache is propagated for the first time, we acquire and immediately
|
// watchCache is propagated for the first time, we acquire and immediately
|
||||||
// release the 'usable' lock.
|
// release the 'usable' lock.
|
||||||
@ -277,7 +287,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f
|
|||||||
c.usable.RLock()
|
c.usable.RLock()
|
||||||
c.usable.RUnlock()
|
c.usable.RUnlock()
|
||||||
|
|
||||||
// List elements from cache, with at least 'resourceVersion'.
|
// List elements from cache, with at least 'listRV'.
|
||||||
listPtr, err := meta.GetItemsPtr(listObj)
|
listPtr, err := meta.GetItemsPtr(listObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -288,7 +298,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f
|
|||||||
}
|
}
|
||||||
filterFunc := filterFunction(key, c.keyFunc, filter)
|
filterFunc := filterFunction(key, c.keyFunc, filter)
|
||||||
|
|
||||||
objs, resourceVersion := c.watchCache.WaitUntilFreshAndList(resourceVersion)
|
objs, readResourceVersion := c.watchCache.WaitUntilFreshAndList(listRV)
|
||||||
for _, obj := range objs {
|
for _, obj := range objs {
|
||||||
object, ok := obj.(runtime.Object)
|
object, ok := obj.(runtime.Object)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -299,7 +309,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if c.versioner != nil {
|
if c.versioner != nil {
|
||||||
if err := c.versioner.UpdateList(listObj, resourceVersion); err != nil {
|
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -389,7 +399,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun
|
|||||||
// Implements cache.ListerWatcher interface.
|
// Implements cache.ListerWatcher interface.
|
||||||
func (lw *cacherListerWatcher) List(options unversioned.ListOptions) (runtime.Object, error) {
|
func (lw *cacherListerWatcher) List(options unversioned.ListOptions) (runtime.Object, error) {
|
||||||
list := lw.newListFunc()
|
list := lw.newListFunc()
|
||||||
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, 0, Everything, list); err != nil {
|
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return list, nil
|
return list, nil
|
||||||
@ -397,11 +407,7 @@ func (lw *cacherListerWatcher) List(options unversioned.ListOptions) (runtime.Ob
|
|||||||
|
|
||||||
// Implements cache.ListerWatcher interface.
|
// Implements cache.ListerWatcher interface.
|
||||||
func (lw *cacherListerWatcher) Watch(options unversioned.ListOptions) (watch.Interface, error) {
|
func (lw *cacherListerWatcher) Watch(options unversioned.ListOptions) (watch.Interface, error) {
|
||||||
version, err := ParseWatchResourceVersion(options.ResourceVersion, lw.resourcePrefix)
|
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, version, Everything)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// cacherWatch implements watch.Interface
|
// cacherWatch implements watch.Interface
|
||||||
|
@ -114,7 +114,7 @@ func TestList(t *testing.T) {
|
|||||||
result := &api.PodList{}
|
result := &api.PodList{}
|
||||||
// TODO: We need to pass ResourceVersion of barPod deletion operation.
|
// TODO: We need to pass ResourceVersion of barPod deletion operation.
|
||||||
// However, there is no easy way to get it, so it is hardcoded to 8.
|
// However, there is no easy way to get it, so it is hardcoded to 8.
|
||||||
if err := cacher.List(context.TODO(), "pods/ns", 8, storage.Everything, result); err != nil {
|
if err := cacher.List(context.TODO(), "pods/ns", "8", storage.Everything, result); err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if result.ListMeta.ResourceVersion != "8" {
|
if result.ListMeta.ResourceVersion != "8" {
|
||||||
@ -179,7 +179,7 @@ func TestWatch(t *testing.T) {
|
|||||||
podFooBis.Spec.NodeName = "anotherFakeNode"
|
podFooBis.Spec.NodeName = "anotherFakeNode"
|
||||||
|
|
||||||
// Set up Watch for object "podFoo".
|
// Set up Watch for object "podFoo".
|
||||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
|
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -192,17 +192,19 @@ func TestWatch(t *testing.T) {
|
|||||||
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
||||||
|
|
||||||
// Check whether we get too-old error.
|
// Check whether we get too-old error.
|
||||||
_, err = cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
|
_, err = cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Expected 'error too old' error")
|
t.Errorf("Expected 'error too old' error")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now test watch with initial state.
|
// Now test watch with initial state.
|
||||||
|
// We want to observe fooCreation too, so need to pass smaller resource version.
|
||||||
initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
|
initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
|
t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
|
||||||
}
|
}
|
||||||
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), storage.Everything)
|
initialVersion--
|
||||||
|
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -211,7 +213,7 @@ func TestWatch(t *testing.T) {
|
|||||||
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
|
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
|
||||||
|
|
||||||
// Now test watch from "now".
|
// Now test watch from "now".
|
||||||
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 0, storage.Everything)
|
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -229,14 +231,14 @@ func TestWatcherTimeout(t *testing.T) {
|
|||||||
cacher := newTestCacher(etcdStorage)
|
cacher := newTestCacher(etcdStorage)
|
||||||
|
|
||||||
// Create a watcher that will not be reading any result.
|
// Create a watcher that will not be reading any result.
|
||||||
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything)
|
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
defer watcher.Stop()
|
defer watcher.Stop()
|
||||||
|
|
||||||
// Create a second watcher that will be reading result.
|
// Create a second watcher that will be reading result.
|
||||||
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything)
|
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -281,11 +283,13 @@ func TestFiltering(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return selector.Matches(labels.Set(metadata.Labels()))
|
return selector.Matches(labels.Set(metadata.Labels()))
|
||||||
}
|
}
|
||||||
|
// We want to observe fooCreation too, so need to pass smaller resource version.
|
||||||
initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
|
initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
|
t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
|
||||||
}
|
}
|
||||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), filter)
|
initialVersion--
|
||||||
|
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -191,24 +191,32 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
|
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
|
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
key = h.prefixEtcdKey(key)
|
key = h.prefixEtcdKey(key)
|
||||||
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
|
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
|
||||||
go w.etcdWatch(h.client, key, resourceVersion)
|
go w.etcdWatch(h.client, key, watchRV)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
|
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
|
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
key = h.prefixEtcdKey(key)
|
key = h.prefixEtcdKey(key)
|
||||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
|
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
|
||||||
go w.etcdWatch(h.client, key, resourceVersion)
|
go w.etcdWatch(h.client, key, watchRV)
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -352,7 +360,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc, listObj runtime.Object) error {
|
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ func TestList(t *testing.T) {
|
|||||||
var got api.PodList
|
var got api.PodList
|
||||||
// TODO: a sorted filter function could be applied such implied
|
// TODO: a sorted filter function could be applied such implied
|
||||||
// ordering on the returned list doesn't matter.
|
// ordering on the returned list doesn't matter.
|
||||||
err := helper.List(context.TODO(), key, 0, storage.Everything, &got)
|
err := helper.List(context.TODO(), key, "", storage.Everything, &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %v", err)
|
t.Errorf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -156,7 +156,7 @@ func TestListFiltered(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var got api.PodList
|
var got api.PodList
|
||||||
err := helper.List(context.TODO(), key, 0, filter, &got)
|
err := helper.List(context.TODO(), key, "", filter, &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %v", err)
|
t.Errorf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -206,7 +206,7 @@ func TestListAcrossDirectories(t *testing.T) {
|
|||||||
list.Items[2] = *returnedObj
|
list.Items[2] = *returnedObj
|
||||||
|
|
||||||
var got api.PodList
|
var got api.PodList
|
||||||
err := roothelper.List(context.TODO(), rootkey, 0, storage.Everything, &got)
|
err := roothelper.List(context.TODO(), rootkey, "", storage.Everything, &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %v", err)
|
t.Errorf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
|
@ -243,7 +243,7 @@ func TestWatch(t *testing.T) {
|
|||||||
key := "/some/key"
|
key := "/some/key"
|
||||||
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
|
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -289,7 +289,7 @@ func TestWatchEtcdState(t *testing.T) {
|
|||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
|
||||||
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
|
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -353,7 +353,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
|
|||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
|
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -388,7 +388,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
|
|||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
h := newEtcdHelper(server.Client, codec, key)
|
h := newEtcdHelper(server.Client, codec, key)
|
||||||
|
|
||||||
watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
|
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -420,7 +420,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
|
|||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
h := newEtcdHelper(server.Client, codec, key)
|
h := newEtcdHelper(server.Client, codec, key)
|
||||||
|
|
||||||
watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
|
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -451,7 +451,7 @@ func TestWatchPurposefulShutdown(t *testing.T) {
|
|||||||
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
|
||||||
|
|
||||||
// Test purposeful shutdown
|
// Test purposeful shutdown
|
||||||
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
|
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -97,13 +97,13 @@ type Interface interface {
|
|||||||
// and any items passing 'filter' are sent down to returned watch.Interface.
|
// and any items passing 'filter' are sent down to returned watch.Interface.
|
||||||
// resourceVersion may be used to specify what version to begin watching
|
// resourceVersion may be used to specify what version to begin watching
|
||||||
// (e.g. reconnecting without missing any updates).
|
// (e.g. reconnecting without missing any updates).
|
||||||
Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
|
Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)
|
||||||
|
|
||||||
// WatchList begins watching the specified key's items. Items are decoded into API
|
// WatchList begins watching the specified key's items. Items are decoded into API
|
||||||
// objects and any item passing 'filter' are sent down to returned watch.Interface.
|
// objects and any item passing 'filter' are sent down to returned watch.Interface.
|
||||||
// resourceVersion may be used to specify what version to begin watching
|
// resourceVersion may be used to specify what version to begin watching
|
||||||
// (e.g. reconnecting without missing any updates).
|
// (e.g. reconnecting without missing any updates).
|
||||||
WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
|
WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)
|
||||||
|
|
||||||
// Get unmarshals json found at key into objPtr. On a not found error, will either
|
// Get unmarshals json found at key into objPtr. On a not found error, will either
|
||||||
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
|
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
|
||||||
@ -118,7 +118,7 @@ type Interface interface {
|
|||||||
// into *List api object (an object that satisfies runtime.IsList definition).
|
// into *List api object (an object that satisfies runtime.IsList definition).
|
||||||
// The returned contents may be delayed, but it is guaranteed that they will
|
// The returned contents may be delayed, but it is guaranteed that they will
|
||||||
// be have at least 'resourceVersion'.
|
// be have at least 'resourceVersion'.
|
||||||
List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error
|
List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error
|
||||||
|
|
||||||
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
|
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
|
||||||
// retrying the update until success if there is index conflict.
|
// retrying the update until success if there is index conflict.
|
||||||
|
@ -41,13 +41,13 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
|
|||||||
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
|
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
|
||||||
// an opaque value, the default watch behavior for non-zero watch is to watch
|
// an opaque value, the default watch behavior for non-zero watch is to watch
|
||||||
// the next value (if you pass "1", you will see updates from "2" onwards).
|
// the next value (if you pass "1", you will see updates from "2" onwards).
|
||||||
func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
func ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
|
||||||
if resourceVersion == "" || resourceVersion == "0" {
|
if resourceVersion == "" || resourceVersion == "0" {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.NewInvalid(kind, "", utilvalidation.ErrorList{
|
return 0, errors.NewInvalid("", "", utilvalidation.ErrorList{
|
||||||
// Validation errors are supposed to return version-specific field
|
// Validation errors are supposed to return version-specific field
|
||||||
// paths, but this is probably close enough.
|
// paths, but this is probably close enough.
|
||||||
utilvalidation.NewInvalidError(utilvalidation.NewFieldPath("resourceVersion"), resourceVersion, err.Error()),
|
utilvalidation.NewInvalidError(utilvalidation.NewFieldPath("resourceVersion"), resourceVersion, err.Error()),
|
||||||
@ -56,6 +56,16 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
|||||||
return version + 1, nil
|
return version + 1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseListResourceVersion takes a resource version argument and converts it to
|
||||||
|
// the etcd version.
|
||||||
|
func ParseListResourceVersion(resourceVersion string) (uint64, error) {
|
||||||
|
if resourceVersion == "" {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
||||||
|
return version, err
|
||||||
|
}
|
||||||
|
|
||||||
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
|
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
|
||||||
meta, err := meta.Accessor(obj)
|
meta, err := meta.Accessor(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
func TestEtcdParseWatchResourceVersion(t *testing.T) {
|
func TestEtcdParseWatchResourceVersion(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
Version string
|
Version string
|
||||||
Kind string
|
|
||||||
ExpectVersion uint64
|
ExpectVersion uint64
|
||||||
Err bool
|
Err bool
|
||||||
}{
|
}{
|
||||||
@ -36,7 +35,7 @@ func TestEtcdParseWatchResourceVersion(t *testing.T) {
|
|||||||
{Version: "10", ExpectVersion: 11},
|
{Version: "10", ExpectVersion: 11},
|
||||||
}
|
}
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind)
|
version, err := ParseWatchResourceVersion(testCase.Version)
|
||||||
switch {
|
switch {
|
||||||
case testCase.Err:
|
case testCase.Err:
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -151,7 +151,7 @@ func TestWatch(t *testing.T) {
|
|||||||
expectedVersion := resp.Node.ModifiedIndex
|
expectedVersion := resp.Node.ModifiedIndex
|
||||||
|
|
||||||
// watch should load the object at the current index
|
// watch should load the object at the current index
|
||||||
w, err := etcdStorage.Watch(ctx, key, 0, storage.Everything)
|
w, err := etcdStorage.Watch(ctx, key, "0", storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user