From 7a2d63048d3fa39ecb19a54281916dbc3e72c36c Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Wed, 4 Feb 2015 15:47:47 -0500 Subject: [PATCH] Enable look-up by secondary index in cache --- pkg/client/cache/store.go | 103 +++++++++++++++++++++++++++++++++ pkg/client/cache/store_test.go | 41 +++++++++++++ 2 files changed, 144 insertions(+) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 9f8595203d8..32bc2b30dc8 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // Store is a generic object storage interface. Reflector knows how to watch a server @@ -59,12 +60,35 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) { return meta.Namespace() + "/" + meta.Name(), nil } +// Index is a generic object storage interface that lets you list objects by their Index +type Index interface { + Store + Index(obj interface{}) ([]interface{}, error) +} + +// IndexFunc knows how to provide an indexed value for an object. +type IndexFunc func(obj interface{}) (string, error) + +// MetaNamespaceIndexFunc is a convenient default IndexFun which knows how to index +// an object by its namespace. +func MetaNamespaceIndexFunc(obj interface{}) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", fmt.Errorf("object has no meta: %v", err) + } + return meta.Namespace(), nil +} + type cache struct { lock sync.RWMutex items map[string]interface{} // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc + // indexFunc is used to make the index value for objects stored in an retrieved from index + indexFunc IndexFunc + // maps the indexFunc value for an object to a set whose keys are keys in items + index map[string]util.StringSet } // Add inserts an item into the cache. @@ -76,6 +100,53 @@ func (c *cache) Add(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items[key] = obj + c.updateIndex(obj) + return nil +} + +// updateIndex adds or modifies an object in the index +// it is intended to be called from a function that already has a lock on the cache +func (c *cache) updateIndex(obj interface{}) error { + if c.indexFunc == nil { + return nil + } + key, err := c.keyFunc(obj) + if err != nil { + return err + } + indexValue, err := c.indexFunc(obj) + if err != nil { + return err + } + set := c.index[indexValue] + if set == nil { + set = util.StringSet{} + c.index[indexValue] = set + } + set.Insert(key) + return nil +} + +// deleteFromIndex removes an entry from the index +// it is intended to be called from a function that already has a lock on the cache +func (c *cache) deleteFromIndex(obj interface{}) error { + if c.indexFunc == nil { + return nil + } + key, err := c.keyFunc(obj) + if err != nil { + return err + } + indexValue, err := c.indexFunc(obj) + if err != nil { + return err + } + set := c.index[indexValue] + if set == nil { + set = util.StringSet{} + c.index[indexValue] = set + } + set.Delete(key) return nil } @@ -88,6 +159,7 @@ func (c *cache) Update(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items[key] = obj + c.updateIndex(obj) return nil } @@ -100,6 +172,7 @@ func (c *cache) Delete(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() delete(c.items, key) + c.deleteFromIndex(obj) return nil } @@ -115,6 +188,24 @@ func (c *cache) List() []interface{} { return list } +// Index returns a list of items that match on the index function +// Index is thread-safe so long as you treat all items as immutable +func (c *cache) Index(obj interface{}) ([]interface{}, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + indexKey, err := c.indexFunc(obj) + if err != nil { + return nil, err + } + set := c.index[indexKey] + list := make([]interface{}, 0, set.Len()) + for _, key := range set.List() { + list = append(list, c.items[key]) + } + return list, nil +} + // Get returns the requested item, or sets exists=false. // Get is completely threadsafe as long as you treat all items as immutable. func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) { @@ -150,6 +241,13 @@ func (c *cache) Replace(list []interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items = items + + // rebuild any index + c.index = map[string]util.StringSet{} + for _, item := range c.items { + c.updateIndex(item) + } + return nil } @@ -157,3 +255,8 @@ func (c *cache) Replace(list []interface{}) error { func NewStore(keyFunc KeyFunc) Store { return &cache{items: map[string]interface{}{}, keyFunc: keyFunc} } + +// NewIndex returns an Index implemented simply with a map and a lock. +func NewIndex(keyFunc KeyFunc, indexFunc IndexFunc) Index { + return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexFunc: indexFunc, index: map[string]util.StringSet{}} +} diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index 3d29aec1f1c..9971c3a1d3f 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -86,10 +86,47 @@ func doTestStore(t *testing.T, store Store) { } } +// Test public interface +func doTestIndex(t *testing.T, index Index) { + mkObj := func(id string, val string) testStoreObject { + return testStoreObject{id: id, val: val} + } + + // Test Index + expected := map[string]util.StringSet{} + expected["b"] = util.NewStringSet("a", "c") + expected["f"] = util.NewStringSet("e") + expected["h"] = util.NewStringSet("g") + index.Add(mkObj("a", "b")) + index.Add(mkObj("c", "b")) + index.Add(mkObj("e", "f")) + index.Add(mkObj("g", "h")) + { + for k, v := range expected { + found := util.StringSet{} + indexResults, err := index.Index(mkObj("", k)) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + for _, item := range indexResults { + found.Insert(item.(testStoreObject).id) + } + items := v.List() + if !found.HasAll(items...) { + t.Errorf("missing items, index %s, expected %v but found %v", k, items, found.List()) + } + } + } +} + func testStoreKeyFunc(obj interface{}) (string, error) { return obj.(testStoreObject).id, nil } +func testStoreIndexFunc(obj interface{}) (string, error) { + return obj.(testStoreObject).val, nil +} + type testStoreObject struct { id string val string @@ -107,3 +144,7 @@ func TestUndeltaStore(t *testing.T) { nop := func([]interface{}) {} doTestStore(t, NewUndeltaStore(nop, testStoreKeyFunc)) } + +func TestIndex(t *testing.T) { + doTestIndex(t, NewIndex(testStoreKeyFunc, testStoreIndexFunc)) +}