Mutation cache should support retrieving items from ByIndex()

Allows tokens controller to observe updates

Kubernetes-commit: 5ac3214c427614ec5c4a53ddc05d952474402412
This commit is contained in:
Clayton Coleman 2017-05-18 18:25:37 -04:00 committed by Kubernetes Publisher
parent d84ceacdd1
commit 25835dfc1e

View File

@ -20,16 +20,19 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"sync" "sync"
"time"
lru "github.com/hashicorp/golang-lru" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilcache "k8s.io/apimachinery/pkg/util/cache"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
) )
// MutationCache is able to take the result of update operations and stores them in an LRU // MutationCache is able to take the result of update operations and stores them in an LRU
// that can be used to provide a more current view of a requested object. It requires interpretting // that can be used to provide a more current view of a requested object. It requires interpreting
// resourceVersions for comparisons. // resourceVersions for comparisons.
// Implementations must be thread-safe. // Implementations must be thread-safe.
// TODO find a way to layer this into an informer/lister // TODO find a way to layer this into an informer/lister
@ -50,19 +53,20 @@ type ResourceVersionComparator interface {
// - increases when updated // - increases when updated
// - is comparable across the same resource in a namespace // - is comparable across the same resource in a namespace
// //
// Most backends will have these semantics. Indexer may be nil. // Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer) MutationCache { // remains in the mutation cache before it is removed.
lru, err := lru.New(100) //
if err != nil { // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
// errors only happen on invalid sizes, this would be programmer error // in the underlying store. This is only safe if your use of the cache can handle mutation entries
panic(err) // remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
} func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
return &mutationCache{ return &mutationCache{
backingCache: backingCache, backingCache: backingCache,
indexer: indexer, indexer: indexer,
mutationCache: lru, mutationCache: utilcache.NewLRUExpireCache(100),
comparator: etcdObjectVersioner{}, comparator: etcdObjectVersioner{},
ttl: ttl,
includeAdds: includeAdds,
} }
} }
@ -73,7 +77,9 @@ type mutationCache struct {
lock sync.Mutex lock sync.Mutex
backingCache Store backingCache Store
indexer Indexer indexer Indexer
mutationCache *lru.Cache mutationCache *utilcache.LRUExpireCache
includeAdds bool
ttl time.Duration
comparator ResourceVersionComparator comparator ResourceVersionComparator
} }
@ -90,10 +96,16 @@ func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
return nil, false, err return nil, false, err
} }
if !exists { if !exists {
if !c.includeAdds {
// we can't distinguish between, "didn't observe create" and "was deleted after create", so // we can't distinguish between, "didn't observe create" and "was deleted after create", so
// if the key is missing, we always return it as missing // if the key is missing, we always return it as missing
return nil, false, nil return nil, false, nil
} }
obj, exists = c.mutationCache.Get(key)
if !exists {
return nil, false, nil
}
}
objRuntime, ok := obj.(runtime.Object) objRuntime, ok := obj.(runtime.Object)
if !ok { if !ok {
return obj, true, nil return obj, true, nil
@ -114,7 +126,9 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er
return nil, err return nil, err
} }
var items []interface{} var items []interface{}
keySet := sets.NewString()
for _, key := range keys { for _, key := range keys {
keySet.Insert(key)
obj, exists, err := c.indexer.GetByKey(key) obj, exists, err := c.indexer.GetByKey(key)
if err != nil { if err != nil {
return nil, err return nil, err
@ -128,6 +142,33 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er
items = append(items, obj) items = append(items, obj)
} }
} }
if c.includeAdds {
fn := c.indexer.GetIndexers()[name]
// Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
for _, key := range c.mutationCache.Keys() {
updated, ok := c.mutationCache.Get(key)
if !ok {
continue
}
if keySet.Has(key.(string)) {
continue
}
elements, err := fn(updated)
if err != nil {
glog.V(4).Info("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
continue
}
for _, inIndex := range elements {
if inIndex != indexKey {
continue
}
items = append(items, updated)
break
}
}
}
return items, nil return items, nil
} }
@ -175,7 +216,7 @@ func (c *mutationCache) Mutation(obj interface{}) {
} }
} }
} }
c.mutationCache.Add(key, obj) c.mutationCache.Add(key, obj, c.ttl)
} }
// etcdObjectVersioner implements versioning and extracting etcd node information // etcdObjectVersioner implements versioning and extracting etcd node information