resource version parsing should all be in one place

This commit is contained in:
fisherxu
2018-06-28 10:00:56 +08:00
parent 05f073dc28
commit 6cb344e78e
3 changed files with 16 additions and 22 deletions

View File

@@ -191,7 +191,7 @@ type Cacher struct {
// its internal cache and updating its cache in the background based on the // its internal cache and updating its cache in the background based on the
// given configuration. // given configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher { func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc) watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix reflectorName := "storage/cacher.go:" + config.ResourcePrefix

View File

@@ -24,7 +24,6 @@ import (
"time" "time"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@@ -137,12 +136,16 @@ type watchCache struct {
// for testing timeouts. // for testing timeouts.
clock clock.Clock clock clock.Clock
// An underlying storage.Versioner.
versioner Versioner
} }
func newWatchCache( func newWatchCache(
capacity int, capacity int,
keyFunc func(runtime.Object) (string, error), keyFunc func(runtime.Object) (string, error),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)) *watchCache { getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
versioner Versioner) *watchCache {
wc := &watchCache{ wc := &watchCache{
capacity: capacity, capacity: capacity,
keyFunc: keyFunc, keyFunc: keyFunc,
@@ -153,6 +156,7 @@ func newWatchCache(
store: cache.NewStore(storeElementKey), store: cache.NewStore(storeElementKey),
resourceVersion: 0, resourceVersion: 0,
clock: clock.RealClock{}, clock: clock.RealClock{},
versioner: versioner,
} }
wc.cond = sync.NewCond(wc.RLocker()) wc.cond = sync.NewCond(wc.RLocker())
return wc return wc
@@ -160,7 +164,7 @@ func newWatchCache(
// Add takes runtime.Object as an argument. // Add takes runtime.Object as an argument.
func (w *watchCache) Add(obj interface{}) error { func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
} }
@@ -172,7 +176,7 @@ func (w *watchCache) Add(obj interface{}) error {
// Update takes runtime.Object as an argument. // Update takes runtime.Object as an argument.
func (w *watchCache) Update(obj interface{}) error { func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
} }
@@ -184,7 +188,7 @@ func (w *watchCache) Update(obj interface{}) error {
// Delete takes runtime.Object as an argument. // Delete takes runtime.Object as an argument.
func (w *watchCache) Delete(obj interface{}) error { func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj) object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil { if err != nil {
return err return err
} }
@@ -194,30 +198,18 @@ func (w *watchCache) Delete(obj interface{}) error {
return w.processEvent(event, resourceVersion, f) return w.processEvent(event, resourceVersion, f)
} }
func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) { func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
object, ok := obj.(runtime.Object) object, ok := obj.(runtime.Object)
if !ok { if !ok {
return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
} }
meta, err := meta.Accessor(object) resourceVersion, err := w.versioner.ObjectResourceVersion(object)
if err != nil {
return nil, 0, err
}
resourceVersion, err := parseResourceVersion(meta.GetResourceVersion())
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
return object, resourceVersion, nil return object, resourceVersion, nil
} }
func parseResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
// Use bitsize being the size of int on the machine.
return strconv.ParseUint(resourceVersion, 10, 0)
}
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
key, err := w.keyFunc(event.Object) key, err := w.keyFunc(event.Object)
if err != nil { if err != nil {
@@ -362,7 +354,7 @@ func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
// Replace takes slice of runtime.Object as a parameter. // Replace takes slice of runtime.Object as a parameter.
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
version, err := parseResourceVersion(resourceVersion) version, err := w.versioner.ParseResourceVersion(resourceVersion)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
@@ -73,7 +74,8 @@ func newTestWatchCache(capacity int) *watchCache {
} }
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, false, nil return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, false, nil
} }
wc := newWatchCache(capacity, keyFunc, getAttrsFunc) versioner := etcdstorage.APIObjectVersioner{}
wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner)
wc.clock = clock.NewFakeClock(time.Now()) wc.clock = clock.NewFakeClock(time.Now())
return wc return wc
} }