mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
support multiple index values for a single object
This commit is contained in:
parent
c5bffaaf31
commit
9386db8c99
26
pkg/client/cache/index.go
vendored
26
pkg/client/cache/index.go
vendored
@ -30,18 +30,36 @@ type Indexer interface {
|
|||||||
Index(indexName string, obj interface{}) ([]interface{}, error)
|
Index(indexName string, obj interface{}) ([]interface{}, error)
|
||||||
// ListIndexFuncValues returns the list of generated values of an Index func
|
// ListIndexFuncValues returns the list of generated values of an Index func
|
||||||
ListIndexFuncValues(indexName string) []string
|
ListIndexFuncValues(indexName string) []string
|
||||||
|
// ByIndex lists object that match on the named indexing function with the exact key
|
||||||
|
ByIndex(indexName, indexKey string) ([]interface{}, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IndexFunc knows how to provide an indexed value for an object.
|
// IndexFunc knows how to provide an indexed value for an object.
|
||||||
type IndexFunc func(obj interface{}) (string, error)
|
type IndexFunc func(obj interface{}) ([]string, error)
|
||||||
|
|
||||||
|
// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns
|
||||||
|
// unique values for every object. This is conversion can create errors when more than one key is found. You
|
||||||
|
// should prefer to make proper key and index functions.
|
||||||
|
func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
|
||||||
|
return func(obj interface{}) (string, error) {
|
||||||
|
indexKeys, err := indexFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if len(indexKeys) > 1 {
|
||||||
|
return "", fmt.Errorf("too many keys: %v", indexKeys)
|
||||||
|
}
|
||||||
|
return indexKeys[0], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
|
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
|
||||||
func MetaNamespaceIndexFunc(obj interface{}) (string, error) {
|
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
|
||||||
meta, err := meta.Accessor(obj)
|
meta, err := meta.Accessor(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("object has no meta: %v", err)
|
return []string{""}, fmt.Errorf("object has no meta: %v", err)
|
||||||
}
|
}
|
||||||
return meta.Namespace(), nil
|
return []string{meta.Namespace()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index maps the indexed value to a set of keys in the store that match on that value
|
// Index maps the indexed value to a set of keys in the store that match on that value
|
||||||
|
91
pkg/client/cache/index_test.go
vendored
91
pkg/client/cache/index_test.go
vendored
@ -17,13 +17,15 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testIndexFunc(obj interface{}) (string, error) {
|
func testIndexFunc(obj interface{}) ([]string, error) {
|
||||||
pod := obj.(*api.Pod)
|
pod := obj.(*api.Pod)
|
||||||
return pod.Labels["foo"], nil
|
return []string{pod.Labels["foo"]}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetIndexFuncValues(t *testing.T) {
|
func TestGetIndexFuncValues(t *testing.T) {
|
||||||
@ -48,3 +50,86 @@ func TestGetIndexFuncValues(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testUsersIndexFunc(obj interface{}) ([]string, error) {
|
||||||
|
pod := obj.(*api.Pod)
|
||||||
|
usersString := pod.Annotations["users"]
|
||||||
|
|
||||||
|
return strings.Split(usersString, ","), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiIndexKeys(t *testing.T) {
|
||||||
|
index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})
|
||||||
|
|
||||||
|
pod1 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
|
||||||
|
pod2 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
|
||||||
|
pod3 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
|
||||||
|
|
||||||
|
index.Add(pod1)
|
||||||
|
index.Add(pod2)
|
||||||
|
index.Add(pod3)
|
||||||
|
|
||||||
|
erniePods, err := index.ByIndex("byUser", "ernie")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(erniePods) != 2 {
|
||||||
|
t.Errorf("Expected 2 pods but got %v", len(erniePods))
|
||||||
|
}
|
||||||
|
|
||||||
|
bertPods, err := index.ByIndex("byUser", "bert")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(bertPods) != 2 {
|
||||||
|
t.Errorf("Expected 2 pods but got %v", len(bertPods))
|
||||||
|
}
|
||||||
|
|
||||||
|
oscarPods, err := index.ByIndex("byUser", "oscar")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(oscarPods) != 1 {
|
||||||
|
t.Errorf("Expected 1 pods but got %v", len(erniePods))
|
||||||
|
}
|
||||||
|
|
||||||
|
ernieAndBertKeys, err := index.Index("byUser", pod1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(ernieAndBertKeys) != 3 {
|
||||||
|
t.Errorf("Expected 3 pods but got %v", len(ernieAndBertKeys))
|
||||||
|
}
|
||||||
|
|
||||||
|
index.Delete(pod3)
|
||||||
|
erniePods, err = index.ByIndex("byUser", "ernie")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(erniePods) != 1 {
|
||||||
|
t.Errorf("Expected 1 pods but got %v", len(erniePods))
|
||||||
|
}
|
||||||
|
elmoPods, err := index.ByIndex("byUser", "elmo")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(elmoPods) != 0 {
|
||||||
|
t.Errorf("Expected 0 pods but got %v", len(elmoPods))
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := api.Scheme.DeepCopy(pod2)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
copyOfPod2 := obj.(*api.Pod)
|
||||||
|
copyOfPod2.Annotations["users"] = "oscar"
|
||||||
|
index.Update(copyOfPod2)
|
||||||
|
bertPods, err = index.ByIndex("byUser", "bert")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(bertPods) != 1 {
|
||||||
|
t.Errorf("Expected 1 pods but got %v", len(bertPods))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
4
pkg/client/cache/store.go
vendored
4
pkg/client/cache/store.go
vendored
@ -169,6 +169,10 @@ func (c *cache) ListIndexFuncValues(indexName string) []string {
|
|||||||
return c.cacheStorage.ListIndexFuncValues(indexName)
|
return c.cacheStorage.ListIndexFuncValues(indexName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
|
||||||
|
return c.cacheStorage.ByIndex(indexName, indexKey)
|
||||||
|
}
|
||||||
|
|
||||||
// Get returns the requested item, or sets exists=false.
|
// Get returns the requested item, or sets exists=false.
|
||||||
// Get is completely threadsafe as long as you treat all items as immutable.
|
// Get is completely threadsafe as long as you treat all items as immutable.
|
||||||
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
||||||
|
4
pkg/client/cache/store_test.go
vendored
4
pkg/client/cache/store_test.go
vendored
@ -123,8 +123,8 @@ func testStoreKeyFunc(obj interface{}) (string, error) {
|
|||||||
return obj.(testStoreObject).id, nil
|
return obj.(testStoreObject).id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func testStoreIndexFunc(obj interface{}) (string, error) {
|
func testStoreIndexFunc(obj interface{}) ([]string, error) {
|
||||||
return obj.(testStoreObject).val, nil
|
return []string{obj.(testStoreObject).val}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func testStoreIndexers() Indexers {
|
func testStoreIndexers() Indexers {
|
||||||
|
61
pkg/client/cache/thread_safe_store.go
vendored
61
pkg/client/cache/thread_safe_store.go
vendored
@ -44,6 +44,7 @@ type ThreadSafeStore interface {
|
|||||||
Replace(map[string]interface{})
|
Replace(map[string]interface{})
|
||||||
Index(indexName string, obj interface{}) ([]interface{}, error)
|
Index(indexName string, obj interface{}) ([]interface{}, error)
|
||||||
ListIndexFuncValues(name string) []string
|
ListIndexFuncValues(name string) []string
|
||||||
|
ByIndex(indexName, indexKey string) ([]interface{}, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// threadSafeMap implements ThreadSafeStore
|
// threadSafeMap implements ThreadSafeStore
|
||||||
@ -134,16 +135,46 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
|
|||||||
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
||||||
}
|
}
|
||||||
|
|
||||||
indexKey, err := indexFunc(obj)
|
indexKeys, err := indexFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
index := c.indices[indexName]
|
index := c.indices[indexName]
|
||||||
|
|
||||||
|
// need to de-dupe the return list. Since multiple keys are allowed, this can happen.
|
||||||
|
returnKeySet := util.StringSet{}
|
||||||
|
for _, indexKey := range indexKeys {
|
||||||
|
set := index[indexKey]
|
||||||
|
for _, key := range set.List() {
|
||||||
|
returnKeySet.Insert(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
list := []interface{}{}
|
||||||
|
for absoluteKey := range returnKeySet {
|
||||||
|
list = append(list, c.items[absoluteKey])
|
||||||
|
}
|
||||||
|
return list, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ByIndex returns a list of items that match an exact value on the index function
|
||||||
|
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
|
||||||
|
c.lock.RLock()
|
||||||
|
defer c.lock.RUnlock()
|
||||||
|
|
||||||
|
indexFunc := c.indexers[indexName]
|
||||||
|
if indexFunc == nil {
|
||||||
|
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
||||||
|
}
|
||||||
|
|
||||||
|
index := c.indices[indexName]
|
||||||
|
|
||||||
set := index[indexKey]
|
set := index[indexKey]
|
||||||
list := make([]interface{}, 0, set.Len())
|
list := make([]interface{}, 0, set.Len())
|
||||||
for _, key := range set.List() {
|
for _, key := range set.List() {
|
||||||
list = append(list, c.items[key])
|
list = append(list, c.items[key])
|
||||||
}
|
}
|
||||||
|
|
||||||
return list, nil
|
return list, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,7 +195,7 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
|
|||||||
c.deleteFromIndices(oldObj, key)
|
c.deleteFromIndices(oldObj, key)
|
||||||
}
|
}
|
||||||
for name, indexFunc := range c.indexers {
|
for name, indexFunc := range c.indexers {
|
||||||
indexValue, err := indexFunc(newObj)
|
indexValues, err := indexFunc(newObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -173,12 +204,15 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
|
|||||||
index = Index{}
|
index = Index{}
|
||||||
c.indices[name] = index
|
c.indices[name] = index
|
||||||
}
|
}
|
||||||
set := index[indexValue]
|
|
||||||
if set == nil {
|
for _, indexValue := range indexValues {
|
||||||
set = util.StringSet{}
|
set := index[indexValue]
|
||||||
index[indexValue] = set
|
if set == nil {
|
||||||
|
set = util.StringSet{}
|
||||||
|
index[indexValue] = set
|
||||||
|
}
|
||||||
|
set.Insert(key)
|
||||||
}
|
}
|
||||||
set.Insert(key)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -187,15 +221,18 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke
|
|||||||
// it is intended to be called from a function that already has a lock on the cache
|
// it is intended to be called from a function that already has a lock on the cache
|
||||||
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
|
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error {
|
||||||
for name, indexFunc := range c.indexers {
|
for name, indexFunc := range c.indexers {
|
||||||
indexValue, err := indexFunc(obj)
|
indexValues, err := indexFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
index := c.indices[name]
|
index := c.indices[name]
|
||||||
if index != nil {
|
for _, indexValue := range indexValues {
|
||||||
set := index[indexValue]
|
if index != nil {
|
||||||
if set != nil {
|
set := index[indexValue]
|
||||||
set.Delete(key)
|
if set != nil {
|
||||||
|
set.Delete(key)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,12 +34,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// nameIndexFunc is an index function that indexes based on an object's name
|
// nameIndexFunc is an index function that indexes based on an object's name
|
||||||
func nameIndexFunc(obj interface{}) (string, error) {
|
func nameIndexFunc(obj interface{}) ([]string, error) {
|
||||||
meta, err := meta.Accessor(obj)
|
meta, err := meta.Accessor(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("object has no meta: %v", err)
|
return []string{""}, fmt.Errorf("object has no meta: %v", err)
|
||||||
}
|
}
|
||||||
return meta.Name(), nil
|
return []string{meta.Name()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceAccountsControllerOptions contains options for running a ServiceAccountsController
|
// ServiceAccountsControllerOptions contains options for running a ServiceAccountsController
|
||||||
|
@ -40,12 +40,12 @@ func NewPersistentVolumeOrderedIndex() *persistentVolumeOrderedIndex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string
|
// accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string
|
||||||
func accessModesIndexFunc(obj interface{}) (string, error) {
|
func accessModesIndexFunc(obj interface{}) ([]string, error) {
|
||||||
if pv, ok := obj.(*api.PersistentVolume); ok {
|
if pv, ok := obj.(*api.PersistentVolume); ok {
|
||||||
modes := volume.GetAccessModesAsString(pv.Spec.AccessModes)
|
modes := volume.GetAccessModesAsString(pv.Spec.AccessModes)
|
||||||
return modes, nil
|
return []string{modes}, nil
|
||||||
}
|
}
|
||||||
return "", fmt.Errorf("object is not a persistent volume: %v", obj)
|
return []string{""}, fmt.Errorf("object is not a persistent volume: %v", obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListByAccessModes returns all volumes with the given set of AccessModeTypes *in order* of their storage capacity (low to high)
|
// ListByAccessModes returns all volumes with the given set of AccessModeTypes *in order* of their storage capacity (low to high)
|
||||||
|
@ -36,7 +36,7 @@ func TestAdmission(t *testing.T) {
|
|||||||
Phase: api.NamespaceActive,
|
Phase: api.NamespaceActive,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
store := cache.NewStore(cache.MetaNamespaceIndexFunc)
|
store := cache.NewStore(cache.IndexFuncToKeyFuncAdapter(cache.MetaNamespaceIndexFunc))
|
||||||
store.Add(namespaceObj)
|
store.Add(namespaceObj)
|
||||||
mockClient := &testclient.Fake{}
|
mockClient := &testclient.Fake{}
|
||||||
lfhandler := NewLifecycle(mockClient).(*lifecycle)
|
lfhandler := NewLifecycle(mockClient).(*lifecycle)
|
||||||
|
Loading…
Reference in New Issue
Block a user