diff --git a/pkg/client/cache/index.go b/pkg/client/cache/index.go index 82c7a7c8188..31379e7dfe7 100644 --- a/pkg/client/cache/index.go +++ b/pkg/client/cache/index.go @@ -30,18 +30,36 @@ type Indexer interface { Index(indexName string, obj interface{}) ([]interface{}, error) // ListIndexFuncValues returns the list of generated values of an Index func ListIndexFuncValues(indexName string) []string + // ByIndex lists object that match on the named indexing function with the exact key + ByIndex(indexName, indexKey string) ([]interface{}, error) } // IndexFunc knows how to provide an indexed value for an object. -type IndexFunc func(obj interface{}) (string, error) +type IndexFunc func(obj interface{}) ([]string, error) + +// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns +// unique values for every object. This is conversion can create errors when more than one key is found. You +// should prefer to make proper key and index functions. +func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc { + return func(obj interface{}) (string, error) { + indexKeys, err := indexFunc(obj) + if err != nil { + return "", err + } + if len(indexKeys) > 1 { + return "", fmt.Errorf("too many keys: %v", indexKeys) + } + return indexKeys[0], nil + } +} // MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace -func MetaNamespaceIndexFunc(obj interface{}) (string, error) { +func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { - return "", fmt.Errorf("object has no meta: %v", err) + return []string{""}, fmt.Errorf("object has no meta: %v", err) } - return meta.Namespace(), nil + return []string{meta.Namespace()}, nil } // Index maps the indexed value to a set of keys in the store that match on that value diff --git a/pkg/client/cache/index_test.go b/pkg/client/cache/index_test.go index 12985a82640..d978203aed0 100644 --- a/pkg/client/cache/index_test.go +++ b/pkg/client/cache/index_test.go @@ -17,13 +17,15 @@ limitations under the License. package cache import ( - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "strings" "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -func testIndexFunc(obj interface{}) (string, error) { +func testIndexFunc(obj interface{}) ([]string, error) { pod := obj.(*api.Pod) - return pod.Labels["foo"], nil + return []string{pod.Labels["foo"]}, nil } func TestGetIndexFuncValues(t *testing.T) { @@ -48,3 +50,86 @@ func TestGetIndexFuncValues(t *testing.T) { } } } + +func testUsersIndexFunc(obj interface{}) ([]string, error) { + pod := obj.(*api.Pod) + usersString := pod.Annotations["users"] + + return strings.Split(usersString, ","), nil +} + +func TestMultiIndexKeys(t *testing.T) { + index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc}) + + pod1 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}} + pod2 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}} + pod3 := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}} + + index.Add(pod1) + index.Add(pod2) + index.Add(pod3) + + erniePods, err := index.ByIndex("byUser", "ernie") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(erniePods) != 2 { + t.Errorf("Expected 2 pods but got %v", len(erniePods)) + } + + bertPods, err := index.ByIndex("byUser", "bert") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(bertPods) != 2 { + t.Errorf("Expected 2 pods but got %v", len(bertPods)) + } + + oscarPods, err := index.ByIndex("byUser", "oscar") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(oscarPods) != 1 { + t.Errorf("Expected 1 pods but got %v", len(erniePods)) + } + + ernieAndBertKeys, err := index.Index("byUser", pod1) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(ernieAndBertKeys) != 3 { + t.Errorf("Expected 3 pods but got %v", len(ernieAndBertKeys)) + } + + index.Delete(pod3) + erniePods, err = index.ByIndex("byUser", "ernie") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(erniePods) != 1 { + t.Errorf("Expected 1 pods but got %v", len(erniePods)) + } + elmoPods, err := index.ByIndex("byUser", "elmo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(elmoPods) != 0 { + t.Errorf("Expected 0 pods but got %v", len(elmoPods)) + } + + obj, err := api.Scheme.DeepCopy(pod2) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + copyOfPod2 := obj.(*api.Pod) + copyOfPod2.Annotations["users"] = "oscar" + index.Update(copyOfPod2) + bertPods, err = index.ByIndex("byUser", "bert") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(bertPods) != 1 { + t.Errorf("Expected 1 pods but got %v", len(bertPods)) + } + +} diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 1f103b61c3d..78e309bcec6 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -169,6 +169,10 @@ func (c *cache) ListIndexFuncValues(indexName string) []string { return c.cacheStorage.ListIndexFuncValues(indexName) } +func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) { + return c.cacheStorage.ByIndex(indexName, indexKey) +} + // Get returns the requested item, or sets exists=false. // Get is completely threadsafe as long as you treat all items as immutable. func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) { diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index 1886e1c120b..15b07bbc89e 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -123,8 +123,8 @@ func testStoreKeyFunc(obj interface{}) (string, error) { return obj.(testStoreObject).id, nil } -func testStoreIndexFunc(obj interface{}) (string, error) { - return obj.(testStoreObject).val, nil +func testStoreIndexFunc(obj interface{}) ([]string, error) { + return []string{obj.(testStoreObject).val}, nil } func testStoreIndexers() Indexers { diff --git a/pkg/client/cache/thread_safe_store.go b/pkg/client/cache/thread_safe_store.go index 81495832e8f..453fa48de6e 100644 --- a/pkg/client/cache/thread_safe_store.go +++ b/pkg/client/cache/thread_safe_store.go @@ -44,6 +44,7 @@ type ThreadSafeStore interface { Replace(map[string]interface{}) Index(indexName string, obj interface{}) ([]interface{}, error) ListIndexFuncValues(name string) []string + ByIndex(indexName, indexKey string) ([]interface{}, error) } // threadSafeMap implements ThreadSafeStore @@ -134,16 +135,46 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, return nil, fmt.Errorf("Index with name %s does not exist", indexName) } - indexKey, err := indexFunc(obj) + indexKeys, err := indexFunc(obj) if err != nil { return nil, err } index := c.indices[indexName] + + // need to de-dupe the return list. Since multiple keys are allowed, this can happen. + returnKeySet := util.StringSet{} + for _, indexKey := range indexKeys { + set := index[indexKey] + for _, key := range set.List() { + returnKeySet.Insert(key) + } + } + + list := []interface{}{} + for absoluteKey := range returnKeySet { + list = append(list, c.items[absoluteKey]) + } + return list, nil +} + +// ByIndex returns a list of items that match an exact value on the index function +func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + indexFunc := c.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + index := c.indices[indexName] + set := index[indexKey] list := make([]interface{}, 0, set.Len()) for _, key := range set.List() { list = append(list, c.items[key]) } + return list, nil } @@ -164,7 +195,7 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke c.deleteFromIndices(oldObj, key) } for name, indexFunc := range c.indexers { - indexValue, err := indexFunc(newObj) + indexValues, err := indexFunc(newObj) if err != nil { return err } @@ -173,12 +204,15 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke index = Index{} c.indices[name] = index } - set := index[indexValue] - if set == nil { - set = util.StringSet{} - index[indexValue] = set + + for _, indexValue := range indexValues { + set := index[indexValue] + if set == nil { + set = util.StringSet{} + index[indexValue] = set + } + set.Insert(key) } - set.Insert(key) } return nil } @@ -187,15 +221,18 @@ func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, ke // it is intended to be called from a function that already has a lock on the cache func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) error { for name, indexFunc := range c.indexers { - indexValue, err := indexFunc(obj) + indexValues, err := indexFunc(obj) if err != nil { return err } + index := c.indices[name] - if index != nil { - set := index[indexValue] - if set != nil { - set.Delete(key) + for _, indexValue := range indexValues { + if index != nil { + set := index[indexValue] + if set != nil { + set.Delete(key) + } } } } diff --git a/pkg/serviceaccount/serviceaccounts_controller.go b/pkg/serviceaccount/serviceaccounts_controller.go index 8da08d65847..3da154c4a4c 100644 --- a/pkg/serviceaccount/serviceaccounts_controller.go +++ b/pkg/serviceaccount/serviceaccounts_controller.go @@ -34,12 +34,12 @@ import ( ) // nameIndexFunc is an index function that indexes based on an object's name -func nameIndexFunc(obj interface{}) (string, error) { +func nameIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { - return "", fmt.Errorf("object has no meta: %v", err) + return []string{""}, fmt.Errorf("object has no meta: %v", err) } - return meta.Name(), nil + return []string{meta.Name()}, nil } // ServiceAccountsControllerOptions contains options for running a ServiceAccountsController diff --git a/pkg/volumeclaimbinder/types.go b/pkg/volumeclaimbinder/types.go index ae17119f094..b3f99b7510d 100644 --- a/pkg/volumeclaimbinder/types.go +++ b/pkg/volumeclaimbinder/types.go @@ -40,12 +40,12 @@ func NewPersistentVolumeOrderedIndex() *persistentVolumeOrderedIndex { } // accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string -func accessModesIndexFunc(obj interface{}) (string, error) { +func accessModesIndexFunc(obj interface{}) ([]string, error) { if pv, ok := obj.(*api.PersistentVolume); ok { modes := volume.GetAccessModesAsString(pv.Spec.AccessModes) - return modes, nil + return []string{modes}, nil } - return "", fmt.Errorf("object is not a persistent volume: %v", obj) + return []string{""}, fmt.Errorf("object is not a persistent volume: %v", obj) } // ListByAccessModes returns all volumes with the given set of AccessModeTypes *in order* of their storage capacity (low to high) diff --git a/plugin/pkg/admission/namespace/lifecycle/admission_test.go b/plugin/pkg/admission/namespace/lifecycle/admission_test.go index 7e8bde7756f..73b7d71d10b 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission_test.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission_test.go @@ -36,7 +36,7 @@ func TestAdmission(t *testing.T) { Phase: api.NamespaceActive, }, } - store := cache.NewStore(cache.MetaNamespaceIndexFunc) + store := cache.NewStore(cache.IndexFuncToKeyFuncAdapter(cache.MetaNamespaceIndexFunc)) store.Add(namespaceObj) mockClient := &testclient.Fake{} lfhandler := NewLifecycle(mockClient).(*lifecycle)