mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Merge pull request #35029 from wojtek-t/avoid_computing_keys_multiple_times
Automatic merge from submit-queue Avoid computing keys multiple times in Cacher This should significantly reduce both cpu-usage and number of allocations in Cacher. This is a proper follow-up from #30998 @deads2k @liggitt
This commit is contained in:
commit
cb03ed36bd
@ -125,6 +125,8 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type filterObjectFunc func(string, runtime.Object) bool
|
||||||
|
|
||||||
// Cacher is responsible for serving WATCH and LIST requests for a given
|
// Cacher is responsible for serving WATCH and LIST requests for a given
|
||||||
// resource from its internal cache and updating its cache in the background
|
// resource from its internal cache and updating its cache in the background
|
||||||
// based on the underlying storage contents.
|
// based on the underlying storage contents.
|
||||||
@ -161,9 +163,6 @@ type Cacher struct {
|
|||||||
// Versioner is used to handle resource versions.
|
// Versioner is used to handle resource versions.
|
||||||
versioner Versioner
|
versioner Versioner
|
||||||
|
|
||||||
// keyFunc is used to get a key in the underyling storage for a given object.
|
|
||||||
keyFunc func(runtime.Object) (string, error)
|
|
||||||
|
|
||||||
// triggerFunc is used for optimizing amount of watchers that needs to process
|
// triggerFunc is used for optimizing amount of watchers that needs to process
|
||||||
// an incoming event.
|
// an incoming event.
|
||||||
triggerFunc TriggerPublisherFunc
|
triggerFunc TriggerPublisherFunc
|
||||||
@ -183,7 +182,7 @@ type Cacher struct {
|
|||||||
// internal cache and updating its cache in the background based on the given
|
// internal cache and updating its cache in the background based on the given
|
||||||
// configuration.
|
// configuration.
|
||||||
func NewCacherFromConfig(config CacherConfig) *Cacher {
|
func NewCacherFromConfig(config CacherConfig) *Cacher {
|
||||||
watchCache := newWatchCache(config.CacheCapacity)
|
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
|
||||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||||
|
|
||||||
// Give this error when it is constructed rather than when you get the
|
// Give this error when it is constructed rather than when you get the
|
||||||
@ -201,7 +200,6 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
|
|||||||
watchCache: watchCache,
|
watchCache: watchCache,
|
||||||
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
|
||||||
versioner: config.Versioner,
|
versioner: config.Versioner,
|
||||||
keyFunc: config.KeyFunc,
|
|
||||||
triggerFunc: config.TriggerPublisherFunc,
|
triggerFunc: config.TriggerPublisherFunc,
|
||||||
watcherIdx: 0,
|
watcherIdx: 0,
|
||||||
watchers: indexedWatchers{
|
watchers: indexedWatchers{
|
||||||
@ -328,7 +326,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
||||||
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, c.keyFunc, pred), forget)
|
watcher := newCacheWatcher(watchRV, chanSize, initEvents, filterFunction(key, pred), forget)
|
||||||
|
|
||||||
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
@ -382,7 +380,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
if err != nil || listVal.Kind() != reflect.Slice {
|
if err != nil || listVal.Kind() != reflect.Slice {
|
||||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||||
}
|
}
|
||||||
filter := filterFunction(key, c.keyFunc, pred)
|
filter := filterFunction(key, pred)
|
||||||
|
|
||||||
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -390,12 +388,12 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
|
|||||||
}
|
}
|
||||||
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
||||||
for _, obj := range objs {
|
for _, obj := range objs {
|
||||||
object, ok := obj.(runtime.Object)
|
elem, ok := obj.(*storeElement)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||||
}
|
}
|
||||||
if filter(object) {
|
if filter(elem.Key, elem.Object) {
|
||||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
|
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
|
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
|
||||||
@ -524,14 +522,9 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterFunction(key string, keyFunc func(runtime.Object) (string, error), p SelectionPredicate) FilterFunc {
|
func filterFunction(key string, p SelectionPredicate) filterObjectFunc {
|
||||||
f := SimpleFilter(p)
|
f := SimpleFilter(p)
|
||||||
filterFunc := func(obj runtime.Object) bool {
|
filterFunc := func(objKey string, obj runtime.Object) bool {
|
||||||
objKey, err := keyFunc(obj)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("invalid object for filter. Obj: %v. Err: %v", obj, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !hasPathPrefix(objKey, key) {
|
if !hasPathPrefix(objKey, key) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -626,12 +619,12 @@ type cacheWatcher struct {
|
|||||||
sync.Mutex
|
sync.Mutex
|
||||||
input chan watchCacheEvent
|
input chan watchCacheEvent
|
||||||
result chan watch.Event
|
result chan watch.Event
|
||||||
filter FilterFunc
|
filter filterObjectFunc
|
||||||
stopped bool
|
stopped bool
|
||||||
forget func(bool)
|
forget func(bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
|
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher {
|
||||||
watcher := &cacheWatcher{
|
watcher := &cacheWatcher{
|
||||||
input: make(chan watchCacheEvent, chanSize),
|
input: make(chan watchCacheEvent, chanSize),
|
||||||
result: make(chan watch.Event, chanSize),
|
result: make(chan watch.Event, chanSize),
|
||||||
@ -707,11 +700,12 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
|
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
||||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
|
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
||||||
|
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.Object)
|
||||||
oldObjPasses := false
|
oldObjPasses := false
|
||||||
if event.PrevObject != nil {
|
if event.PrevObject != nil {
|
||||||
oldObjPasses = c.filter(event.PrevObject)
|
oldObjPasses = c.filter(event.Key, event.PrevObject)
|
||||||
}
|
}
|
||||||
if !curObjPasses && !oldObjPasses {
|
if !curObjPasses && !oldObjPasses {
|
||||||
// Watcher is not interested in that object.
|
// Watcher is not interested in that object.
|
||||||
@ -752,7 +746,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
|
|||||||
const initProcessThreshold = 500 * time.Millisecond
|
const initProcessThreshold = 500 * time.Millisecond
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
for _, event := range initEvents {
|
for _, event := range initEvents {
|
||||||
c.sendWatchCacheEvent(event)
|
c.sendWatchCacheEvent(&event)
|
||||||
}
|
}
|
||||||
processingTime := time.Since(startTime)
|
processingTime := time.Since(startTime)
|
||||||
if processingTime > initProcessThreshold {
|
if processingTime > initProcessThreshold {
|
||||||
@ -772,7 +766,7 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uin
|
|||||||
}
|
}
|
||||||
// only send events newer than resourceVersion
|
// only send events newer than resourceVersion
|
||||||
if event.ResourceVersion > resourceVersion {
|
if event.ResourceVersion > resourceVersion {
|
||||||
c.sendWatchCacheEvent(event)
|
c.sendWatchCacheEvent(&event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -46,9 +46,27 @@ type watchCacheEvent struct {
|
|||||||
Type watch.EventType
|
Type watch.EventType
|
||||||
Object runtime.Object
|
Object runtime.Object
|
||||||
PrevObject runtime.Object
|
PrevObject runtime.Object
|
||||||
|
Key string
|
||||||
ResourceVersion uint64
|
ResourceVersion uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Computing a key of an object is generally non-trivial (it performs
|
||||||
|
// e.g. validation underneath). To avoid computing it multiple times
|
||||||
|
// (to serve the event in different List/Watch requests), in the
|
||||||
|
// underlying store we are keeping pair (key, object).
|
||||||
|
type storeElement struct {
|
||||||
|
Key string
|
||||||
|
Object runtime.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func storeElementKey(obj interface{}) (string, error) {
|
||||||
|
elem, ok := obj.(*storeElement)
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("not a storeElement: %v", obj)
|
||||||
|
}
|
||||||
|
return elem.Key, nil
|
||||||
|
}
|
||||||
|
|
||||||
// watchCacheElement is a single "watch event" stored in a cache.
|
// watchCacheElement is a single "watch event" stored in a cache.
|
||||||
// It contains the resource version of the object and the object
|
// It contains the resource version of the object and the object
|
||||||
// itself.
|
// itself.
|
||||||
@ -72,6 +90,9 @@ type watchCache struct {
|
|||||||
// Maximum size of history window.
|
// Maximum size of history window.
|
||||||
capacity int
|
capacity int
|
||||||
|
|
||||||
|
// keyFunc is used to get a key in the underlying storage for a given object.
|
||||||
|
keyFunc func(runtime.Object) (string, error)
|
||||||
|
|
||||||
// cache is used a cyclic buffer - its first element (with the smallest
|
// cache is used a cyclic buffer - its first element (with the smallest
|
||||||
// resourceVersion) is defined by startIndex, its last element is defined
|
// resourceVersion) is defined by startIndex, its last element is defined
|
||||||
// by endIndex (if cache is full it will be startIndex + capacity).
|
// by endIndex (if cache is full it will be startIndex + capacity).
|
||||||
@ -100,13 +121,14 @@ type watchCache struct {
|
|||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatchCache(capacity int) *watchCache {
|
func newWatchCache(capacity int, keyFunc func(runtime.Object) (string, error)) *watchCache {
|
||||||
wc := &watchCache{
|
wc := &watchCache{
|
||||||
capacity: capacity,
|
capacity: capacity,
|
||||||
|
keyFunc: keyFunc,
|
||||||
cache: make([]watchCacheElement, capacity),
|
cache: make([]watchCacheElement, capacity),
|
||||||
startIndex: 0,
|
startIndex: 0,
|
||||||
endIndex: 0,
|
endIndex: 0,
|
||||||
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
store: cache.NewStore(storeElementKey),
|
||||||
resourceVersion: 0,
|
resourceVersion: 0,
|
||||||
clock: clock.RealClock{},
|
clock: clock.RealClock{},
|
||||||
}
|
}
|
||||||
@ -114,6 +136,7 @@ func newWatchCache(capacity int) *watchCache {
|
|||||||
return wc
|
return wc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add takes runtime.Object as an argument.
|
||||||
func (w *watchCache) Add(obj interface{}) error {
|
func (w *watchCache) Add(obj interface{}) error {
|
||||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -121,10 +144,11 @@ func (w *watchCache) Add(obj interface{}) error {
|
|||||||
}
|
}
|
||||||
event := watch.Event{Type: watch.Added, Object: object}
|
event := watch.Event{Type: watch.Added, Object: object}
|
||||||
|
|
||||||
f := func(obj runtime.Object) error { return w.store.Add(obj) }
|
f := func(elem *storeElement) error { return w.store.Add(elem) }
|
||||||
return w.processEvent(event, resourceVersion, f)
|
return w.processEvent(event, resourceVersion, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update takes runtime.Object as an argument.
|
||||||
func (w *watchCache) Update(obj interface{}) error {
|
func (w *watchCache) Update(obj interface{}) error {
|
||||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -132,10 +156,11 @@ func (w *watchCache) Update(obj interface{}) error {
|
|||||||
}
|
}
|
||||||
event := watch.Event{Type: watch.Modified, Object: object}
|
event := watch.Event{Type: watch.Modified, Object: object}
|
||||||
|
|
||||||
f := func(obj runtime.Object) error { return w.store.Update(obj) }
|
f := func(elem *storeElement) error { return w.store.Update(elem) }
|
||||||
return w.processEvent(event, resourceVersion, f)
|
return w.processEvent(event, resourceVersion, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete takes runtime.Object as an argument.
|
||||||
func (w *watchCache) Delete(obj interface{}) error {
|
func (w *watchCache) Delete(obj interface{}) error {
|
||||||
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -143,7 +168,7 @@ func (w *watchCache) Delete(obj interface{}) error {
|
|||||||
}
|
}
|
||||||
event := watch.Event{Type: watch.Deleted, Object: object}
|
event := watch.Event{Type: watch.Deleted, Object: object}
|
||||||
|
|
||||||
f := func(obj runtime.Object) error { return w.store.Delete(obj) }
|
f := func(elem *storeElement) error { return w.store.Delete(elem) }
|
||||||
return w.processEvent(event, resourceVersion, f)
|
return w.processEvent(event, resourceVersion, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -170,43 +195,57 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
|
|||||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {
|
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
|
||||||
|
key, err := w.keyFunc(event.Object)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't compute key: %v", err)
|
||||||
|
}
|
||||||
|
elem := &storeElement{Key: key, Object: event.Object}
|
||||||
|
|
||||||
w.Lock()
|
w.Lock()
|
||||||
defer w.Unlock()
|
defer w.Unlock()
|
||||||
previous, exists, err := w.store.Get(event.Object)
|
previous, exists, err := w.store.Get(elem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var prevObject runtime.Object
|
var prevObject runtime.Object
|
||||||
if exists {
|
if exists {
|
||||||
prevObject = previous.(runtime.Object)
|
prevObject = previous.(*storeElement).Object
|
||||||
|
}
|
||||||
|
watchCacheEvent := watchCacheEvent{
|
||||||
|
Type: event.Type,
|
||||||
|
Object: event.Object,
|
||||||
|
PrevObject: prevObject,
|
||||||
|
Key: key,
|
||||||
|
ResourceVersion: resourceVersion,
|
||||||
}
|
}
|
||||||
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
|
|
||||||
if w.onEvent != nil {
|
if w.onEvent != nil {
|
||||||
w.onEvent(watchCacheEvent)
|
w.onEvent(watchCacheEvent)
|
||||||
}
|
}
|
||||||
w.updateCache(resourceVersion, watchCacheEvent)
|
w.updateCache(resourceVersion, &watchCacheEvent)
|
||||||
w.resourceVersion = resourceVersion
|
w.resourceVersion = resourceVersion
|
||||||
w.cond.Broadcast()
|
w.cond.Broadcast()
|
||||||
return updateFunc(event.Object)
|
return updateFunc(elem)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assumes that lock is already held for write.
|
// Assumes that lock is already held for write.
|
||||||
func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {
|
func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
|
||||||
if w.endIndex == w.startIndex+w.capacity {
|
if w.endIndex == w.startIndex+w.capacity {
|
||||||
// Cache is full - remove the oldest element.
|
// Cache is full - remove the oldest element.
|
||||||
w.startIndex++
|
w.startIndex++
|
||||||
}
|
}
|
||||||
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
|
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, *event}
|
||||||
w.endIndex++
|
w.endIndex++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List returns list of pointers to <storeElement> objects.
|
||||||
func (w *watchCache) List() []interface{} {
|
func (w *watchCache) List() []interface{} {
|
||||||
w.RLock()
|
w.RLock()
|
||||||
defer w.RUnlock()
|
defer w.RUnlock()
|
||||||
return w.store.List()
|
return w.store.List()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
|
||||||
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
|
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
|
||||||
startTime := w.clock.Now()
|
startTime := w.clock.Now()
|
||||||
go func() {
|
go func() {
|
||||||
@ -244,30 +283,56 @@ func (w *watchCache) ListKeys() []string {
|
|||||||
return w.store.ListKeys()
|
return w.store.ListKeys()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get takes runtime.Object as a parameter. However, it returns
|
||||||
|
// pointer to <storeElement>.
|
||||||
func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
|
func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
|
||||||
|
object, ok := obj.(runtime.Object)
|
||||||
|
if !ok {
|
||||||
|
return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
|
||||||
|
}
|
||||||
|
key, err := w.keyFunc(object)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
w.RLock()
|
w.RLock()
|
||||||
defer w.RUnlock()
|
defer w.RUnlock()
|
||||||
return w.store.Get(obj)
|
return w.store.Get(&storeElement{Key: key, Object: object})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetByKey returns pointer to <storeElement>.
|
||||||
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
|
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
|
||||||
w.RLock()
|
w.RLock()
|
||||||
defer w.RUnlock()
|
defer w.RUnlock()
|
||||||
return w.store.GetByKey(key)
|
return w.store.GetByKey(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Replace takes slice of runtime.Object as a paramater.
|
||||||
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
||||||
version, err := parseResourceVersion(resourceVersion)
|
version, err := parseResourceVersion(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
toReplace := make([]interface{}, 0, len(objs))
|
||||||
|
for _, obj := range objs {
|
||||||
|
object, ok := obj.(runtime.Object)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
|
||||||
|
}
|
||||||
|
key, err := w.keyFunc(object)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't compute key: %v", err)
|
||||||
|
}
|
||||||
|
toReplace = append(toReplace, &storeElement{Key: key, Object: object})
|
||||||
|
}
|
||||||
|
|
||||||
w.Lock()
|
w.Lock()
|
||||||
defer w.Unlock()
|
defer w.Unlock()
|
||||||
|
|
||||||
w.startIndex = 0
|
w.startIndex = 0
|
||||||
w.endIndex = 0
|
w.endIndex = 0
|
||||||
if err := w.store.Replace(objs, resourceVersion); err != nil {
|
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.resourceVersion = version
|
w.resourceVersion = version
|
||||||
@ -306,7 +371,16 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]wa
|
|||||||
allItems := w.store.List()
|
allItems := w.store.List()
|
||||||
result := make([]watchCacheEvent, len(allItems))
|
result := make([]watchCacheEvent, len(allItems))
|
||||||
for i, item := range allItems {
|
for i, item := range allItems {
|
||||||
result[i] = watchCacheEvent{Type: watch.Added, Object: item.(runtime.Object)}
|
elem, ok := item.(*storeElement)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("not a storeElement: %v", elem)
|
||||||
|
}
|
||||||
|
result[i] = watchCacheEvent{
|
||||||
|
Type: watch.Added,
|
||||||
|
Object: elem.Object,
|
||||||
|
Key: elem.Key,
|
||||||
|
ResourceVersion: w.resourceVersion,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,10 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod {
|
|||||||
|
|
||||||
// newTestWatchCache just adds a fake clock.
|
// newTestWatchCache just adds a fake clock.
|
||||||
func newTestWatchCache(capacity int) *watchCache {
|
func newTestWatchCache(capacity int) *watchCache {
|
||||||
wc := newWatchCache(capacity)
|
keyFunc := func(obj runtime.Object) (string, error) {
|
||||||
|
return NamespaceKeyFunc("prefix", obj)
|
||||||
|
}
|
||||||
|
wc := newWatchCache(capacity, keyFunc)
|
||||||
wc.clock = clock.NewFakeClock(time.Now())
|
wc.clock = clock.NewFakeClock(time.Now())
|
||||||
return wc
|
return wc
|
||||||
}
|
}
|
||||||
@ -60,7 +63,7 @@ func TestWatchCacheBasic(t *testing.T) {
|
|||||||
if item, ok, _ := store.Get(pod1); !ok {
|
if item, ok, _ := store.Get(pod1); !ok {
|
||||||
t.Errorf("didn't find pod")
|
t.Errorf("didn't find pod")
|
||||||
} else {
|
} else {
|
||||||
if !api.Semantic.DeepEqual(pod1, item) {
|
if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod1}, item) {
|
||||||
t.Errorf("expected %v, got %v", pod1, item)
|
t.Errorf("expected %v, got %v", pod1, item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -71,7 +74,7 @@ func TestWatchCacheBasic(t *testing.T) {
|
|||||||
if item, ok, _ := store.Get(pod2); !ok {
|
if item, ok, _ := store.Get(pod2); !ok {
|
||||||
t.Errorf("didn't find pod")
|
t.Errorf("didn't find pod")
|
||||||
} else {
|
} else {
|
||||||
if !api.Semantic.DeepEqual(pod2, item) {
|
if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/pod", Object: pod2}, item) {
|
||||||
t.Errorf("expected %v, got %v", pod1, item)
|
t.Errorf("expected %v, got %v", pod1, item)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -90,7 +93,7 @@ func TestWatchCacheBasic(t *testing.T) {
|
|||||||
{
|
{
|
||||||
podNames := sets.String{}
|
podNames := sets.String{}
|
||||||
for _, item := range store.List() {
|
for _, item := range store.List() {
|
||||||
podNames.Insert(item.(*api.Pod).ObjectMeta.Name)
|
podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name)
|
||||||
}
|
}
|
||||||
if !podNames.HasAll("pod1", "pod2", "pod3") {
|
if !podNames.HasAll("pod1", "pod2", "pod3") {
|
||||||
t.Errorf("missing pods, found %v", podNames)
|
t.Errorf("missing pods, found %v", podNames)
|
||||||
@ -108,7 +111,7 @@ func TestWatchCacheBasic(t *testing.T) {
|
|||||||
{
|
{
|
||||||
podNames := sets.String{}
|
podNames := sets.String{}
|
||||||
for _, item := range store.List() {
|
for _, item := range store.List() {
|
||||||
podNames.Insert(item.(*api.Pod).ObjectMeta.Name)
|
podNames.Insert(item.(*storeElement).Object.(*api.Pod).ObjectMeta.Name)
|
||||||
}
|
}
|
||||||
if !podNames.HasAll("pod4", "pod5") {
|
if !podNames.HasAll("pod4", "pod5") {
|
||||||
t.Errorf("missing pods, found %v", podNames)
|
t.Errorf("missing pods, found %v", podNames)
|
||||||
|
Loading…
Reference in New Issue
Block a user