mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
add mutation cache filter
This commit is contained in:
parent
278b1e56c2
commit
f88c7725b4
8
staging/src/k8s.io/apiserver/Godeps/Godeps.json
generated
8
staging/src/k8s.io/apiserver/Godeps/Godeps.json
generated
@ -434,6 +434,14 @@
|
|||||||
"ImportPath": "github.com/grpc-ecosystem/grpc-gateway/utilities",
|
"ImportPath": "github.com/grpc-ecosystem/grpc-gateway/utilities",
|
||||||
"Rev": "84398b94e188ee336f307779b57b3aa91af7063c"
|
"Rev": "84398b94e188ee336f307779b57b3aa91af7063c"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru/simplelru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/howeyc/gopass",
|
"ImportPath": "github.com/howeyc/gopass",
|
||||||
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
||||||
|
10
staging/src/k8s.io/client-go/Godeps/Godeps.json
generated
10
staging/src/k8s.io/client-go/Godeps/Godeps.json
generated
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/client-go",
|
"ImportPath": "k8s.io/client-go",
|
||||||
"GoVersion": "go1.7",
|
"GoVersion": "go1.8",
|
||||||
"GodepVersion": "v79",
|
"GodepVersion": "v79",
|
||||||
"Packages": [
|
"Packages": [
|
||||||
"./..."
|
"./..."
|
||||||
@ -138,6 +138,14 @@
|
|||||||
"ImportPath": "github.com/google/gofuzz",
|
"ImportPath": "github.com/google/gofuzz",
|
||||||
"Rev": "44d81051d367757e1c7c6a5a86423ece9afcf63c"
|
"Rev": "44d81051d367757e1c7c6a5a86423ece9afcf63c"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru/simplelru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/howeyc/gopass",
|
"ImportPath": "github.com/howeyc/gopass",
|
||||||
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
||||||
|
@ -28,6 +28,8 @@ type Indexer interface {
|
|||||||
Store
|
Store
|
||||||
// Retrieve list of objects that match on the named indexing function
|
// Retrieve list of objects that match on the named indexing function
|
||||||
Index(indexName string, obj interface{}) ([]interface{}, error)
|
Index(indexName string, obj interface{}) ([]interface{}, error)
|
||||||
|
// IndexKeys returns the set of keys that match on the named indexing function.
|
||||||
|
IndexKeys(indexName, indexKey string) ([]string, error)
|
||||||
// ListIndexFuncValues returns the list of generated values of an Index func
|
// ListIndexFuncValues returns the list of generated values of an Index func
|
||||||
ListIndexFuncValues(indexName string) []string
|
ListIndexFuncValues(indexName string) []string
|
||||||
// ByIndex lists object that match on the named indexing function with the exact key
|
// ByIndex lists object that match on the named indexing function with the exact key
|
||||||
|
220
staging/src/k8s.io/client-go/tools/cache/mutation_cache.go
vendored
Normal file
220
staging/src/k8s.io/client-go/tools/cache/mutation_cache.go
vendored
Normal file
@ -0,0 +1,220 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MutationCache is able to take the result of update operations and stores them in an LRU
|
||||||
|
// that can be used to provide a more current view of a requested object. It requires interpretting
|
||||||
|
// resourceVersions for comparisons.
|
||||||
|
// Implementations must be thread-safe.
|
||||||
|
// TODO find a way to layer this into an informer/lister
|
||||||
|
type MutationCache interface {
|
||||||
|
GetByKey(key string) (interface{}, bool, error)
|
||||||
|
ByIndex(indexName, indexKey string) ([]interface{}, error)
|
||||||
|
Mutation(interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type ResourceVersionComparator interface {
|
||||||
|
CompareResourceVersion(lhs, rhs runtime.Object) int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
|
||||||
|
// deal with objects that have a resource version that:
|
||||||
|
//
|
||||||
|
// - is an integer
|
||||||
|
// - increases when updated
|
||||||
|
// - is comparable across the same resource in a namespace
|
||||||
|
//
|
||||||
|
// Most backends will have these semantics. Indexer may be nil.
|
||||||
|
func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer) MutationCache {
|
||||||
|
lru, err := lru.New(100)
|
||||||
|
if err != nil {
|
||||||
|
// errors only happen on invalid sizes, this would be programmer error
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &mutationCache{
|
||||||
|
backingCache: backingCache,
|
||||||
|
indexer: indexer,
|
||||||
|
mutationCache: lru,
|
||||||
|
comparator: etcdObjectVersioner{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
|
||||||
|
// since you can't distinguish between, "didn't observe create" and "was deleted after create",
|
||||||
|
// if the key is missing from the backing cache, we always return it as missing
|
||||||
|
type mutationCache struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
backingCache Store
|
||||||
|
indexer Indexer
|
||||||
|
mutationCache *lru.Cache
|
||||||
|
|
||||||
|
comparator ResourceVersionComparator
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetByKey is never guaranteed to return back the value set in Mutation. It could be paged out, it could
|
||||||
|
// be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
|
||||||
|
// You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
|
||||||
|
func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
obj, exists, err := c.backingCache.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
// we can't distinguish between, "didn't observe create" and "was deleted after create", so
|
||||||
|
// if the key is missing, we always return it as missing
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
objRuntime, ok := obj.(runtime.Object)
|
||||||
|
if !ok {
|
||||||
|
return obj, true, nil
|
||||||
|
}
|
||||||
|
return c.newerObject(key, objRuntime), true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ByIndex returns the newer objects that match the provided index and indexer key.
|
||||||
|
// Will return an error if no indexer was provided.
|
||||||
|
func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
if c.indexer == nil {
|
||||||
|
return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
|
||||||
|
}
|
||||||
|
keys, err := c.indexer.IndexKeys(name, indexKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var items []interface{}
|
||||||
|
for _, key := range keys {
|
||||||
|
obj, exists, err := c.indexer.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if objRuntime, ok := obj.(runtime.Object); ok {
|
||||||
|
items = append(items, c.newerObject(key, objRuntime))
|
||||||
|
} else {
|
||||||
|
items = append(items, obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newerObject checks the mutation cache for a newer object and returns one if found. If the
|
||||||
|
// mutated object is older than the backing object, it is removed from the Must be
|
||||||
|
// called while the lock is held.
|
||||||
|
func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
|
||||||
|
mutatedObj, exists := c.mutationCache.Get(key)
|
||||||
|
if !exists {
|
||||||
|
return backing
|
||||||
|
}
|
||||||
|
mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
|
||||||
|
if !ok {
|
||||||
|
return backing
|
||||||
|
}
|
||||||
|
if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
|
||||||
|
c.mutationCache.Remove(key)
|
||||||
|
return backing
|
||||||
|
}
|
||||||
|
return mutatedObjRuntime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
|
||||||
|
// copy. If you call Mutation twice with the same object on different threads, one will win, but its not defined
|
||||||
|
// which one. This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
|
||||||
|
// preserved, but you may not get the version of the object you want. The object you get is only guaranteed to
|
||||||
|
// "one that was valid at some point in time", not "the one that I want".
|
||||||
|
func (c *mutationCache) Mutation(obj interface{}) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
|
key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
// this is a "nice to have", so failures shouldn't do anything weird
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if objRuntime, ok := obj.(runtime.Object); ok {
|
||||||
|
if mutatedObj, exists := c.mutationCache.Get(key); exists {
|
||||||
|
if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
|
||||||
|
if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.mutationCache.Add(key, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
// etcdObjectVersioner implements versioning and extracting etcd node information
|
||||||
|
// for objects that have an embedded ObjectMeta or ListMeta field.
|
||||||
|
type etcdObjectVersioner struct{}
|
||||||
|
|
||||||
|
// ObjectResourceVersion implements Versioner
|
||||||
|
func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
||||||
|
accessor, err := meta.Accessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
version := accessor.GetResourceVersion()
|
||||||
|
if len(version) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return strconv.ParseUint(version, 10, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
|
||||||
|
// but etcd resource versions are special, they're actually ints, so we can easily compare them.
|
||||||
|
func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
|
||||||
|
lhsVersion, err := a.ObjectResourceVersion(lhs)
|
||||||
|
if err != nil {
|
||||||
|
// coder error
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
rhsVersion, err := a.ObjectResourceVersion(rhs)
|
||||||
|
if err != nil {
|
||||||
|
// coder error
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lhsVersion == rhsVersion {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if lhsVersion < rhsVersion {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1
|
||||||
|
}
|
@ -172,6 +172,10 @@ func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error)
|
|||||||
return c.cacheStorage.Index(indexName, obj)
|
return c.cacheStorage.Index(indexName, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) {
|
||||||
|
return c.cacheStorage.IndexKeys(indexName, indexKey)
|
||||||
|
}
|
||||||
|
|
||||||
// ListIndexFuncValues returns the list of generated values of an Index func
|
// ListIndexFuncValues returns the list of generated values of an Index func
|
||||||
func (c *cache) ListIndexFuncValues(indexName string) []string {
|
func (c *cache) ListIndexFuncValues(indexName string) []string {
|
||||||
return c.cacheStorage.ListIndexFuncValues(indexName)
|
return c.cacheStorage.ListIndexFuncValues(indexName)
|
||||||
|
@ -43,6 +43,7 @@ type ThreadSafeStore interface {
|
|||||||
ListKeys() []string
|
ListKeys() []string
|
||||||
Replace(map[string]interface{}, string)
|
Replace(map[string]interface{}, string)
|
||||||
Index(indexName string, obj interface{}) ([]interface{}, error)
|
Index(indexName string, obj interface{}) ([]interface{}, error)
|
||||||
|
IndexKeys(indexName, indexKey string) ([]string, error)
|
||||||
ListIndexFuncValues(name string) []string
|
ListIndexFuncValues(name string) []string
|
||||||
ByIndex(indexName, indexKey string) ([]interface{}, error)
|
ByIndex(indexName, indexKey string) ([]interface{}, error)
|
||||||
GetIndexers() Indexers
|
GetIndexers() Indexers
|
||||||
@ -184,6 +185,23 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
|
|||||||
return list, nil
|
return list, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IndexKeys returns a list of keys that match on the index function.
|
||||||
|
// IndexKeys is thread-safe so long as you treat all items as immutable.
|
||||||
|
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
|
||||||
|
indexFunc := c.indexers[indexName]
|
||||||
|
if indexFunc == nil {
|
||||||
|
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
||||||
|
}
|
||||||
|
|
||||||
|
index := c.indices[indexName]
|
||||||
|
|
||||||
|
set := index[indexKey]
|
||||||
|
return set.List(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
|
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
|
||||||
c.lock.RLock()
|
c.lock.RLock()
|
||||||
defer c.lock.RUnlock()
|
defer c.lock.RUnlock()
|
||||||
|
@ -190,6 +190,14 @@
|
|||||||
"ImportPath": "github.com/grpc-ecosystem/grpc-gateway/utilities",
|
"ImportPath": "github.com/grpc-ecosystem/grpc-gateway/utilities",
|
||||||
"Rev": "84398b94e188ee336f307779b57b3aa91af7063c"
|
"Rev": "84398b94e188ee336f307779b57b3aa91af7063c"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru/simplelru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/howeyc/gopass",
|
"ImportPath": "github.com/howeyc/gopass",
|
||||||
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
||||||
|
@ -182,6 +182,14 @@
|
|||||||
"ImportPath": "github.com/grpc-ecosystem/grpc-gateway/utilities",
|
"ImportPath": "github.com/grpc-ecosystem/grpc-gateway/utilities",
|
||||||
"Rev": "84398b94e188ee336f307779b57b3aa91af7063c"
|
"Rev": "84398b94e188ee336f307779b57b3aa91af7063c"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru/simplelru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/howeyc/gopass",
|
"ImportPath": "github.com/howeyc/gopass",
|
||||||
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
||||||
|
@ -174,6 +174,14 @@
|
|||||||
"ImportPath": "github.com/grpc-ecosystem/grpc-gateway/utilities",
|
"ImportPath": "github.com/grpc-ecosystem/grpc-gateway/utilities",
|
||||||
"Rev": "84398b94e188ee336f307779b57b3aa91af7063c"
|
"Rev": "84398b94e188ee336f307779b57b3aa91af7063c"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/hashicorp/golang-lru/simplelru",
|
||||||
|
"Rev": "a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/howeyc/gopass",
|
"ImportPath": "github.com/howeyc/gopass",
|
||||||
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
"Rev": "3ca23474a7c7203e0a0a070fd33508f6efdb9b3d"
|
||||||
|
Loading…
Reference in New Issue
Block a user