From 6cb344e78ead86abc9507f14786ff16eb46436e5 Mon Sep 17 00:00:00 2001 From: fisherxu Date: Thu, 28 Jun 2018 10:00:56 +0800 Subject: [PATCH] resource version parsing should all be in one place --- .../k8s.io/apiserver/pkg/storage/cacher.go | 2 +- .../apiserver/pkg/storage/watch_cache.go | 32 +++++++------------ .../apiserver/pkg/storage/watch_cache_test.go | 4 ++- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index 36691bce94b..277b368b191 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -191,7 +191,7 @@ type Cacher struct { // its internal cache and updating its cache in the background based on the // given configuration. func NewCacherFromConfig(config CacherConfig) *Cacher { - watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc) + watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix diff --git a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go index 373e74eeda5..b7644f66b88 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go @@ -24,7 +24,6 @@ import ( "time" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -137,12 +136,16 @@ type watchCache struct { // for testing timeouts. clock clock.Clock + + // An underlying storage.Versioner. + versioner Versioner } func newWatchCache( capacity int, keyFunc func(runtime.Object) (string, error), - getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)) *watchCache { + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error), + versioner Versioner) *watchCache { wc := &watchCache{ capacity: capacity, keyFunc: keyFunc, @@ -153,6 +156,7 @@ func newWatchCache( store: cache.NewStore(storeElementKey), resourceVersion: 0, clock: clock.RealClock{}, + versioner: versioner, } wc.cond = sync.NewCond(wc.RLocker()) return wc @@ -160,7 +164,7 @@ func newWatchCache( // Add takes runtime.Object as an argument. func (w *watchCache) Add(obj interface{}) error { - object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) if err != nil { return err } @@ -172,7 +176,7 @@ func (w *watchCache) Add(obj interface{}) error { // Update takes runtime.Object as an argument. func (w *watchCache) Update(obj interface{}) error { - object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) if err != nil { return err } @@ -184,7 +188,7 @@ func (w *watchCache) Update(obj interface{}) error { // Delete takes runtime.Object as an argument. func (w *watchCache) Delete(obj interface{}) error { - object, resourceVersion, err := objectToVersionedRuntimeObject(obj) + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) if err != nil { return err } @@ -194,30 +198,18 @@ func (w *watchCache) Delete(obj interface{}) error { return w.processEvent(event, resourceVersion, f) } -func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) { +func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) { object, ok := obj.(runtime.Object) if !ok { return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) } - meta, err := meta.Accessor(object) - if err != nil { - return nil, 0, err - } - resourceVersion, err := parseResourceVersion(meta.GetResourceVersion()) + resourceVersion, err := w.versioner.ObjectResourceVersion(object) if err != nil { return nil, 0, err } return object, resourceVersion, nil } -func parseResourceVersion(resourceVersion string) (uint64, error) { - if resourceVersion == "" { - return 0, nil - } - // Use bitsize being the size of int on the machine. - return strconv.ParseUint(resourceVersion, 10, 0) -} - func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { key, err := w.keyFunc(event.Object) if err != nil { @@ -362,7 +354,7 @@ func (w *watchCache) GetByKey(key string) (interface{}, bool, error) { // Replace takes slice of runtime.Object as a parameter. func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { - version, err := parseResourceVersion(resourceVersion) + version, err := w.versioner.ParseResourceVersion(resourceVersion) if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go index f7b353e63f7..e8489b0edc4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + etcdstorage "k8s.io/apiserver/pkg/storage/etcd" "k8s.io/client-go/tools/cache" ) @@ -73,7 +74,8 @@ func newTestWatchCache(capacity int) *watchCache { } return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, false, nil } - wc := newWatchCache(capacity, keyFunc, getAttrsFunc) + versioner := etcdstorage.APIObjectVersioner{} + wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner) wc.clock = clock.NewFakeClock(time.Now()) return wc }