Merge pull request #58101 from lavalamp/scramble-rv

Automatic merge from submit-queue (batch tested with PRs 58518, 58771, 58101, 56829). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Resource version parsing should all be in one place

This is 100% refactoring that ought to be a no-op at run time. It puts resource version parsing in a single file. Doing this because I want to follow up with a change which will make it obvious to users of the system our rules about resource version. Don't want to mix that with this refactor (on the off chance it gets rolled back).

Part of: #58112

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-01-26 01:03:38 -08:00 committed by GitHub
commit d9ded43bbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 134 additions and 108 deletions

View File

@ -20,7 +20,6 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"reflect" "reflect"
"strconv"
"sync" "sync"
"time" "time"
@ -290,7 +289,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
// Implements storage.Interface. // Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) { func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion) watchRV, err := c.versioner.ParseWatchResourceVersion(resourceVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -361,7 +360,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
// If resourceVersion is specified, serve it from cache. // If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that // It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion. // fresh as the given resourceVersion.
getRV, err := ParseListResourceVersion(resourceVersion) getRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
if err != nil { if err != nil {
return err return err
} }
@ -414,7 +413,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
// If resourceVersion is specified, serve it from cache. // If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that // It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion. // fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion) listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
if err != nil { if err != nil {
return err return err
} }
@ -483,7 +482,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
// If resourceVersion is specified, serve it from cache. // If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that // It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion. // fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion) listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
if err != nil { if err != nil {
return err return err
} }
@ -711,11 +710,7 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait() c.ready.wait()
resourceVersion := c.reflector.LastSyncResourceVersion() resourceVersion := c.reflector.LastSyncResourceVersion()
if resourceVersion == "" { return c.versioner.ParseListResourceVersion(resourceVersion)
return 0, nil
}
return strconv.ParseUint(resourceVersion, 10, 64)
} }
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher. // cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.

View File

@ -203,3 +203,9 @@ func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return 0, fmt.Errorf("unimplemented") return 0, fmt.Errorf("unimplemented")
} }
func (testVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
}
func (testVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
}

View File

@ -58,6 +58,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/cache:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/cache:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
) )
@ -81,6 +82,44 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
return strconv.ParseUint(version, 10, 64) return strconv.ParseUint(version, 10, 64)
} }
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func (a APIObjectVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, storage.NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
})
}
return version, nil
}
// ParseListResourceVersion takes a resource version argument and converts it to
// the etcd version.
// TODO: reevaluate whether it is really clearer to have both this and the
// Watch version of this function, since they perform the same logic.
func (a APIObjectVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, storage.NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
})
}
return version, nil
}
// APIObjectVersioner implements Versioner // APIObjectVersioner implements Versioner
var Versioner storage.Versioner = APIObjectVersioner{} var Versioner storage.Versioner = APIObjectVersioner{}

View File

@ -20,6 +20,7 @@ import (
"testing" "testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/storage"
storagetesting "k8s.io/apiserver/pkg/storage/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing"
) )
@ -40,6 +41,43 @@ func TestObjectVersioner(t *testing.T) {
} }
} }
func TestEtcdParseResourceVersion(t *testing.T) {
testCases := []struct {
Version string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 1},
{Version: "10", ExpectVersion: 10},
}
v := APIObjectVersioner{}
testFuncs := []func(string) (uint64, error){
v.ParseListResourceVersion,
v.ParseWatchResourceVersion,
}
for _, testCase := range testCases {
for i, f := range testFuncs {
version, err := f(testCase.Version)
switch {
case testCase.Err && err == nil:
t.Errorf("%s[%v]: unexpected non-error", testCase.Version, i)
case testCase.Err && !storage.IsInvalidError(err):
t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err)
case !testCase.Err && err != nil:
t.Errorf("%s[%v]: unexpected error: %v", testCase.Version, i, err)
}
if version != testCase.ExpectVersion {
t.Errorf("%s[%v]: expected version %d but was %d", testCase.Version, i, testCase.ExpectVersion, version)
}
}
}
}
func TestCompareResourceVersion(t *testing.T) { func TestCompareResourceVersion(t *testing.T) {
five := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}} five := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}
six := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}} six := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}

View File

@ -235,7 +235,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
if ctx == nil { if ctx == nil {
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -250,7 +250,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
if ctx == nil { if ctx == nil {
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion) watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -24,7 +24,6 @@ import (
"fmt" "fmt"
"path" "path"
"reflect" "reflect"
"strconv"
"strings" "strings"
"time" "time"
@ -524,14 +523,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
case s.pagingEnabled && pred.Limit > 0: case s.pagingEnabled && pred.Limit > 0:
if len(resourceVersion) > 0 { if len(resourceVersion) > 0 {
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion)
if err != nil { if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
} }
if fromRV > 0 { if fromRV > 0 {
options = append(options, clientv3.WithRev(fromRV)) options = append(options, clientv3.WithRev(int64(fromRV)))
} }
returnedRV = fromRV returnedRV = int64(fromRV)
} }
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
@ -539,14 +538,14 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
default: default:
if len(resourceVersion) > 0 { if len(resourceVersion) > 0 {
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64) fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion)
if err != nil { if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
} }
if fromRV > 0 { if fromRV > 0 {
options = append(options, clientv3.WithRev(fromRV)) options = append(options, clientv3.WithRev(int64(fromRV)))
} }
returnedRV = fromRV returnedRV = int64(fromRV)
} }
options = append(options, clientv3.WithPrefix()) options = append(options, clientv3.WithPrefix())
@ -666,7 +665,7 @@ func (s *store) WatchList(ctx context.Context, key string, resourceVersion strin
} }
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) { func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
rev, err := storage.ParseWatchResourceVersion(rv) rev, err := s.versioner.ParseWatchResourceVersion(rv)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -19,7 +19,6 @@ package etcd3
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strconv"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -186,7 +185,7 @@ func TestWatchFromZero(t *testing.T) {
} }
// Compact previous versions // Compact previous versions
revToCompact, err := strconv.Atoi(out.ResourceVersion) revToCompact, err := store.versioner.ParseListResourceVersion(out.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
} }
@ -305,7 +304,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
var wres clientv3.WatchResponse var wres clientv3.WatchResponse
wres = <-etcdW wres = <-etcdW
watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion) watchedDeleteRev, err := store.versioner.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("ParseWatchResourceVersion failed: %v", err) t.Fatalf("ParseWatchResourceVersion failed: %v", err)
} }

View File

@ -28,7 +28,9 @@ import (
// Versioner abstracts setting and retrieving metadata fields from database response // Versioner abstracts setting and retrieving metadata fields from database response
// onto the object ot list. It is required to maintain storage invariants - updating an // onto the object ot list. It is required to maintain storage invariants - updating an
// object twice with the same data except for the ResourceVersion and SelfLink must be // object twice with the same data except for the ResourceVersion and SelfLink must be
// a no-op. // a no-op. A resourceVersion of type uint64 is a 'raw' resourceVersion,
// intended to be sent directly to or from the backend. A resourceVersion of
// type string is a 'safe' resourceVersion, intended for consumption by users.
type Versioner interface { type Versioner interface {
// UpdateObject sets storage metadata into an API object. Returns an error if the object // UpdateObject sets storage metadata into an API object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata // cannot be updated correctly. May return nil if the requested object does not need metadata
@ -45,6 +47,17 @@ type Versioner interface {
// ObjectResourceVersion returns the resource version (for persistence) of the specified object. // ObjectResourceVersion returns the resource version (for persistence) of the specified object.
// Should return an error if the specified object does not have a persistable version. // Should return an error if the specified object does not have a persistable version.
ObjectResourceVersion(obj runtime.Object) (uint64, error) ObjectResourceVersion(obj runtime.Object) (uint64, error)
// ParseWatchResourceVersion takes a resource version argument and
// converts it to the storage backend we should pass to helper.Watch().
// Because resourceVersion is an opaque value, the default watch
// behavior for non-zero watch is to watch the next value (if you pass
// "1", you will see updates from "2" onwards).
ParseWatchResourceVersion(resourceVersion string) (uint64, error)
// ParseListResourceVersion takes a resource version argument and
// converts it to the storage backend version. Appropriate for
// everything that's not intended as an argument for watch.
ParseListResourceVersion(resourceVersion string) (uint64, error)
} }
// ResponseMeta contains information about the database metadata that is associated with // ResponseMeta contains information about the database metadata that is associated with

View File

@ -97,12 +97,13 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServe
return server, storage return server, storage
} }
func newTestCacher(s storage.Interface, cap int) *storage.Cacher { func newTestCacher(s storage.Interface, cap int) (*storage.Cacher, storage.Versioner) {
prefix := "pods" prefix := "pods"
v := etcdstorage.APIObjectVersioner{}
config := storage.CacherConfig{ config := storage.CacherConfig{
CacheCapacity: cap, CacheCapacity: cap,
Storage: s, Storage: s,
Versioner: etcdstorage.APIObjectVersioner{}, Versioner: v,
Type: &example.Pod{}, Type: &example.Pod{},
ResourcePrefix: prefix, ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
@ -110,7 +111,7 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
NewListFunc: func() runtime.Object { return &example.PodList{} }, NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
} }
return storage.NewCacherFromConfig(config) return storage.NewCacherFromConfig(config), v
} }
func makeTestPod(name string) *example.Pod { func makeTestPod(name string) *example.Pod {
@ -139,7 +140,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
podFoo := makeTestPod("foo") podFoo := makeTestPod("foo")
@ -170,7 +171,7 @@ func TestGet(t *testing.T) {
func TestList(t *testing.T) { func TestList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
podFoo := makeTestPod("foo") podFoo := makeTestPod("foo")
@ -251,14 +252,14 @@ func TestList(t *testing.T) {
func TestInfiniteList(t *testing.T) { func TestInfiniteList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
podFoo := makeTestPod("foo") podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil) fooCreated := updatePod(t, etcdStorage, podFoo, nil)
// Set up List at fooCreated.ResourceVersion + 10 // Set up List at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -307,7 +308,7 @@ func TestWatch(t *testing.T) {
// Inject one list error to make sure we test the relist case. // Inject one list error to make sure we test the relist case.
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error cacher, _ := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
defer cacher.Stop() defer cacher.Stop()
podFoo := makeTestPod("foo") podFoo := makeTestPod("foo")
@ -382,7 +383,7 @@ func TestWatch(t *testing.T) {
func TestWatcherTimeout(t *testing.T) { func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
// initialVersion is used to initate the watcher at the beginning of the world, // initialVersion is used to initate the watcher at the beginning of the world,
@ -424,7 +425,7 @@ func TestWatcherTimeout(t *testing.T) {
func TestFiltering(t *testing.T) { func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher, _ := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
// Ensure that the cacher is initialized, before creating any pods, // Ensure that the cacher is initialized, before creating any pods,
@ -486,7 +487,7 @@ func TestFiltering(t *testing.T) {
func TestStartingResourceVersion(t *testing.T) { func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
// add 1 object // add 1 object
@ -494,7 +495,7 @@ func TestStartingResourceVersion(t *testing.T) {
fooCreated := updatePod(t, etcdStorage, podFoo, nil) fooCreated := updatePod(t, etcdStorage, podFoo, nil)
// Set up Watch starting at fooCreated.ResourceVersion + 10 // Set up Watch starting at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -517,7 +518,7 @@ func TestStartingResourceVersion(t *testing.T) {
select { select {
case e := <-watcher.ResultChan(): case e := <-watcher.ResultChan():
pod := e.Object.(*example.Pod) pod := e.Object.(*example.Pod)
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion) podRV, err := v.ParseWatchResourceVersion(pod.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@ -544,15 +545,15 @@ func TestEmptyWatchEventCache(t *testing.T) {
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
// get rv of last pod created // get rv of last pod created
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
// We now have a cacher with an empty cache of watch events and a resourceVersion of rv. // We now have a cacher with an empty cache of watch events and a resourceVersion of rv.
// It should support establishing watches from rv and higher, but not older. // It should support establishing watches from rv and higher, but not older.
@ -598,11 +599,11 @@ func TestEmptyWatchEventCache(t *testing.T) {
func TestRandomWatchDeliver(t *testing.T) { func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t) defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10) cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop() defer cacher.Stop()
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }

View File

@ -18,14 +18,12 @@ package storage
import ( import (
"fmt" "fmt"
"strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation/path" "k8s.io/apimachinery/pkg/api/validation/path"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
) )
type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error) type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
@ -50,35 +48,6 @@ func NoTriggerPublisher(runtime.Object) []MatchValue {
return nil return nil
} }
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, NewInvalidError(field.ErrorList{
// Validation errors are supposed to return version-specific field
// paths, but this is probably close enough.
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
})
}
return version, nil
}
// ParseListResourceVersion takes a resource version argument and converts it to
// the etcd version.
func ParseListResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
return version, err
}
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) { func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj) meta, err := meta.Accessor(obj)
if err != nil { if err != nil {

View File

@ -22,40 +22,6 @@ import (
"testing" "testing"
) )
func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 1},
{Version: "10", ExpectVersion: 10},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !IsInvalidError(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}
func TestHasPathPrefix(t *testing.T) { func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct { validTestcases := []struct {
s string s string