mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Merge pull request #65404 from fisherxu/collapse-rvParse
Automatic merge from submit-queue (batch tested with PRs 65404, 65323, 65468). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Collapse the list and watch resource version parse **What this PR does / why we need it**: Collapse the list and watch resource version parse, as discuss in [#64513](https://github.com/kubernetes/kubernetes/pull/64513#issuecomment-399380988) **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes # **Special notes for your reviewer**: **Release note**: ```release-note NONE ```
This commit is contained in:
commit
f9a1cb9b63
@ -289,7 +289,7 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
|
|||||||
|
|
||||||
// Implements storage.Interface.
|
// Implements storage.Interface.
|
||||||
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
|
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
|
||||||
watchRV, err := c.versioner.ParseWatchResourceVersion(resourceVersion)
|
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -360,7 +360,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
|
|||||||
// If resourceVersion is specified, serve it from cache.
|
// If resourceVersion is specified, serve it from cache.
|
||||||
// It's guaranteed that the returned value is at least that
|
// It's guaranteed that the returned value is at least that
|
||||||
// fresh as the given resourceVersion.
|
// fresh as the given resourceVersion.
|
||||||
getRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
|
getRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -413,7 +413,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
|
|||||||
// If resourceVersion is specified, serve it from cache.
|
// If resourceVersion is specified, serve it from cache.
|
||||||
// It's guaranteed that the returned value is at least that
|
// It's guaranteed that the returned value is at least that
|
||||||
// fresh as the given resourceVersion.
|
// fresh as the given resourceVersion.
|
||||||
listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
|
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -482,7 +482,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
// If resourceVersion is specified, serve it from cache.
|
// If resourceVersion is specified, serve it from cache.
|
||||||
// It's guaranteed that the returned value is at least that
|
// It's guaranteed that the returned value is at least that
|
||||||
// fresh as the given resourceVersion.
|
// fresh as the given resourceVersion.
|
||||||
listRV, err := c.versioner.ParseListResourceVersion(resourceVersion)
|
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -699,7 +699,7 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
|||||||
c.ready.wait()
|
c.ready.wait()
|
||||||
|
|
||||||
resourceVersion := c.reflector.LastSyncResourceVersion()
|
resourceVersion := c.reflector.LastSyncResourceVersion()
|
||||||
return c.versioner.ParseListResourceVersion(resourceVersion)
|
return c.versioner.ParseResourceVersion(resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
||||||
|
@ -203,9 +203,6 @@ func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
|
|||||||
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
||||||
return 0, fmt.Errorf("unimplemented")
|
return 0, fmt.Errorf("unimplemented")
|
||||||
}
|
}
|
||||||
func (testVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
|
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
|
||||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
|
||||||
}
|
|
||||||
func (testVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) {
|
|
||||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||||
}
|
}
|
||||||
|
@ -82,11 +82,11 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
|
|||||||
return strconv.ParseUint(version, 10, 64)
|
return strconv.ParseUint(version, 10, 64)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseWatchResourceVersion takes a resource version argument and converts it to
|
// ParseResourceVersion takes a resource version argument and converts it to
|
||||||
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
|
// the etcd version. For watch we should pass to helper.Watch(). Because resourceVersion is
|
||||||
// an opaque value, the default watch behavior for non-zero watch is to watch
|
// an opaque value, the default watch behavior for non-zero watch is to watch
|
||||||
// the next value (if you pass "1", you will see updates from "2" onwards).
|
// the next value (if you pass "1", you will see updates from "2" onwards).
|
||||||
func (a APIObjectVersioner) ParseWatchResourceVersion(resourceVersion string) (uint64, error) {
|
func (a APIObjectVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
|
||||||
if resourceVersion == "" || resourceVersion == "0" {
|
if resourceVersion == "" || resourceVersion == "0" {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
@ -101,25 +101,6 @@ func (a APIObjectVersioner) ParseWatchResourceVersion(resourceVersion string) (u
|
|||||||
return version, nil
|
return version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseListResourceVersion takes a resource version argument and converts it to
|
|
||||||
// the etcd version.
|
|
||||||
// TODO: reevaluate whether it is really clearer to have both this and the
|
|
||||||
// Watch version of this function, since they perform the same logic.
|
|
||||||
func (a APIObjectVersioner) ParseListResourceVersion(resourceVersion string) (uint64, error) {
|
|
||||||
if resourceVersion == "" {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
version, err := strconv.ParseUint(resourceVersion, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return 0, storage.NewInvalidError(field.ErrorList{
|
|
||||||
// Validation errors are supposed to return version-specific field
|
|
||||||
// paths, but this is probably close enough.
|
|
||||||
field.Invalid(field.NewPath("resourceVersion"), resourceVersion, err.Error()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return version, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// APIObjectVersioner implements Versioner
|
// APIObjectVersioner implements Versioner
|
||||||
var Versioner storage.Versioner = APIObjectVersioner{}
|
var Versioner storage.Versioner = APIObjectVersioner{}
|
||||||
|
|
||||||
|
@ -56,8 +56,7 @@ func TestEtcdParseResourceVersion(t *testing.T) {
|
|||||||
|
|
||||||
v := APIObjectVersioner{}
|
v := APIObjectVersioner{}
|
||||||
testFuncs := []func(string) (uint64, error){
|
testFuncs := []func(string) (uint64, error){
|
||||||
v.ParseListResourceVersion,
|
v.ParseResourceVersion,
|
||||||
v.ParseWatchResourceVersion,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, testCase := range testCases {
|
for _, testCase := range testCases {
|
||||||
|
@ -231,7 +231,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
|
watchRV, err := h.versioner.ParseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -246,7 +246,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
glog.Errorf("Context is nil")
|
glog.Errorf("Context is nil")
|
||||||
}
|
}
|
||||||
watchRV, err := h.versioner.ParseWatchResourceVersion(resourceVersion)
|
watchRV, err := h.versioner.ParseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -534,7 +534,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
|||||||
|
|
||||||
case s.pagingEnabled && pred.Limit > 0:
|
case s.pagingEnabled && pred.Limit > 0:
|
||||||
if len(resourceVersion) > 0 {
|
if len(resourceVersion) > 0 {
|
||||||
fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion)
|
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||||
}
|
}
|
||||||
@ -549,7 +549,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
|||||||
|
|
||||||
default:
|
default:
|
||||||
if len(resourceVersion) > 0 {
|
if len(resourceVersion) > 0 {
|
||||||
fromRV, err := s.versioner.ParseListResourceVersion(resourceVersion)
|
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||||
}
|
}
|
||||||
@ -676,7 +676,7 @@ func (s *store) WatchList(ctx context.Context, key string, resourceVersion strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
|
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
|
||||||
rev, err := s.versioner.ParseWatchResourceVersion(rv)
|
rev, err := s.versioner.ParseResourceVersion(rv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -185,7 +185,7 @@ func TestWatchFromZero(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Compact previous versions
|
// Compact previous versions
|
||||||
revToCompact, err := store.versioner.ParseListResourceVersion(out.ResourceVersion)
|
revToCompact, err := store.versioner.ParseResourceVersion(out.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
|
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
|
||||||
}
|
}
|
||||||
@ -304,7 +304,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
|||||||
var wres clientv3.WatchResponse
|
var wres clientv3.WatchResponse
|
||||||
wres = <-etcdW
|
wres = <-etcdW
|
||||||
|
|
||||||
watchedDeleteRev, err := store.versioner.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion)
|
watchedDeleteRev, err := store.versioner.ParseResourceVersion(watchedDeleteObj.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -49,16 +49,12 @@ type Versioner interface {
|
|||||||
// Should return an error if the specified object does not have a persistable version.
|
// Should return an error if the specified object does not have a persistable version.
|
||||||
ObjectResourceVersion(obj runtime.Object) (uint64, error)
|
ObjectResourceVersion(obj runtime.Object) (uint64, error)
|
||||||
|
|
||||||
// ParseWatchResourceVersion takes a resource version argument and
|
// ParseResourceVersion takes a resource version argument and
|
||||||
// converts it to the storage backend we should pass to helper.Watch().
|
// converts it to the storage backend. For watch we should pass to helper.Watch().
|
||||||
// Because resourceVersion is an opaque value, the default watch
|
// Because resourceVersion is an opaque value, the default watch
|
||||||
// behavior for non-zero watch is to watch the next value (if you pass
|
// behavior for non-zero watch is to watch the next value (if you pass
|
||||||
// "1", you will see updates from "2" onwards).
|
// "1", you will see updates from "2" onwards).
|
||||||
ParseWatchResourceVersion(resourceVersion string) (uint64, error)
|
ParseResourceVersion(resourceVersion string) (uint64, error)
|
||||||
// ParseListResourceVersion takes a resource version argument and
|
|
||||||
// converts it to the storage backend version. Appropriate for
|
|
||||||
// everything that's not intended as an argument for watch.
|
|
||||||
ParseListResourceVersion(resourceVersion string) (uint64, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResponseMeta contains information about the database metadata that is associated with
|
// ResponseMeta contains information about the database metadata that is associated with
|
||||||
|
@ -313,7 +313,7 @@ func TestInfiniteList(t *testing.T) {
|
|||||||
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
||||||
|
|
||||||
// Set up List at fooCreated.ResourceVersion + 10
|
// Set up List at fooCreated.ResourceVersion + 10
|
||||||
rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
|
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -549,7 +549,7 @@ func TestStartingResourceVersion(t *testing.T) {
|
|||||||
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
||||||
|
|
||||||
// Set up Watch starting at fooCreated.ResourceVersion + 10
|
// Set up Watch starting at fooCreated.ResourceVersion + 10
|
||||||
rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
|
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -572,7 +572,7 @@ func TestStartingResourceVersion(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case e := <-watcher.ResultChan():
|
case e := <-watcher.ResultChan():
|
||||||
pod := e.Object.(*example.Pod)
|
pod := e.Object.(*example.Pod)
|
||||||
podRV, err := v.ParseWatchResourceVersion(pod.ResourceVersion)
|
podRV, err := v.ParseResourceVersion(pod.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -603,7 +603,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// get rv of last pod created
|
// get rv of last pod created
|
||||||
rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
|
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -657,7 +657,7 @@ func TestRandomWatchDeliver(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
||||||
rv, err := v.ParseWatchResourceVersion(fooCreated.ResourceVersion)
|
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user