diff --git a/pkg/client/cache/index.go b/pkg/client/cache/index.go new file mode 100644 index 00000000000..f96d227e397 --- /dev/null +++ b/pkg/client/cache/index.go @@ -0,0 +1,52 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Indexer is a storage interface that lets you list objects using multiple indexing functions +type Indexer interface { + Store + // Retrieve list of objects that match on the named indexing function + Index(indexName string, obj interface{}) ([]interface{}, error) +} + +// IndexFunc knows how to provide an indexed value for an object. +type IndexFunc func(obj interface{}) (string, error) + +// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace +func MetaNamespaceIndexFunc(obj interface{}) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", fmt.Errorf("object has no meta: %v", err) + } + return meta.Namespace(), nil +} + +// Index maps the indexed value to a set of keys in the store that match on that value +type Index map[string]util.StringSet + +// Indexers maps a name to a IndexFunc +type Indexers map[string]IndexFunc + +// Indices maps a name to an Index +type Indices map[string]Index diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 9f8595203d8..ab4a3a07920 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // Store is a generic object storage interface. Reflector knows how to watch a server @@ -65,6 +66,10 @@ type cache struct { // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc + // indexers maps a name to an IndexFunc + indexers Indexers + // indices maps a name to an Index + indices Indices } // Add inserts an item into the cache. @@ -73,9 +78,66 @@ func (c *cache) Add(obj interface{}) error { if err != nil { return fmt.Errorf("couldn't create key for object: %v", err) } + // keep a pointer to whatever could have been there previously c.lock.Lock() defer c.lock.Unlock() + oldObject := c.items[key] c.items[key] = obj + c.updateIndices(oldObject, obj) + return nil +} + +// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj +// updateIndices must be called from a function that already has a lock on the cache +func (c *cache) updateIndices(oldObj interface{}, newObj interface{}) error { + // if we got an old object, we need to remove it before we add it again + if oldObj != nil { + c.deleteFromIndices(oldObj) + } + key, err := c.keyFunc(newObj) + if err != nil { + return err + } + for name, indexFunc := range c.indexers { + indexValue, err := indexFunc(newObj) + if err != nil { + return err + } + index := c.indices[name] + if index == nil { + index = Index{} + c.indices[name] = index + } + set := index[indexValue] + if set == nil { + set = util.StringSet{} + index[indexValue] = set + } + set.Insert(key) + } + return nil +} + +// deleteFromIndices removes the object from each of the managed indexes +// it is intended to be called from a function that already has a lock on the cache +func (c *cache) deleteFromIndices(obj interface{}) error { + key, err := c.keyFunc(obj) + if err != nil { + return err + } + for name, indexFunc := range c.indexers { + indexValue, err := indexFunc(obj) + if err != nil { + return err + } + index := c.indices[name] + if index != nil { + set := index[indexValue] + if set != nil { + set.Delete(key) + } + } + } return nil } @@ -87,7 +149,9 @@ func (c *cache) Update(obj interface{}) error { } c.lock.Lock() defer c.lock.Unlock() + oldObject := c.items[key] c.items[key] = obj + c.updateIndices(oldObject, obj) return nil } @@ -100,6 +164,7 @@ func (c *cache) Delete(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() delete(c.items, key) + c.deleteFromIndices(obj) return nil } @@ -115,6 +180,30 @@ func (c *cache) List() []interface{} { return list } +// Index returns a list of items that match on the index function +// Index is thread-safe so long as you treat all items as immutable +func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + indexFunc := c.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + indexKey, err := indexFunc(obj) + if err != nil { + return nil, err + } + index := c.indices[indexName] + set := index[indexKey] + list := make([]interface{}, 0, set.Len()) + for _, key := range set.List() { + list = append(list, c.items[key]) + } + return list, nil +} + // Get returns the requested item, or sets exists=false. // Get is completely threadsafe as long as you treat all items as immutable. func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) { @@ -150,10 +239,22 @@ func (c *cache) Replace(list []interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items = items + + // rebuild any index + c.indices = Indices{} + for _, item := range c.items { + c.updateIndices(nil, item) + } + return nil } // NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc) Store { - return &cache{items: map[string]interface{}{}, keyFunc: keyFunc} + return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: Indexers{}, indices: Indices{}} +} + +// NewIndexer returns an Indexer implemented simply with a map and a lock. +func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { + return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: indexers, indices: Indices{}} } diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index 3d29aec1f1c..ad2b6eda2c4 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -86,10 +86,53 @@ func doTestStore(t *testing.T, store Store) { } } +// Test public interface +func doTestIndex(t *testing.T, indexer Indexer) { + mkObj := func(id string, val string) testStoreObject { + return testStoreObject{id: id, val: val} + } + + // Test Index + expected := map[string]util.StringSet{} + expected["b"] = util.NewStringSet("a", "c") + expected["f"] = util.NewStringSet("e") + expected["h"] = util.NewStringSet("g") + indexer.Add(mkObj("a", "b")) + indexer.Add(mkObj("c", "b")) + indexer.Add(mkObj("e", "f")) + indexer.Add(mkObj("g", "h")) + { + for k, v := range expected { + found := util.StringSet{} + indexResults, err := indexer.Index("by_val", mkObj("", k)) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + for _, item := range indexResults { + found.Insert(item.(testStoreObject).id) + } + items := v.List() + if !found.HasAll(items...) { + t.Errorf("missing items, index %s, expected %v but found %v", k, items, found.List()) + } + } + } +} + func testStoreKeyFunc(obj interface{}) (string, error) { return obj.(testStoreObject).id, nil } +func testStoreIndexFunc(obj interface{}) (string, error) { + return obj.(testStoreObject).val, nil +} + +func testStoreIndexers() Indexers { + indexers := Indexers{} + indexers["by_val"] = testStoreIndexFunc + return indexers +} + type testStoreObject struct { id string val string @@ -107,3 +150,7 @@ func TestUndeltaStore(t *testing.T) { nop := func([]interface{}) {} doTestStore(t, NewUndeltaStore(nop, testStoreKeyFunc)) } + +func TestIndex(t *testing.T) { + doTestIndex(t, NewIndexer(testStoreKeyFunc, testStoreIndexers())) +}