From 7a2d63048d3fa39ecb19a54281916dbc3e72c36c Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Wed, 4 Feb 2015 15:47:47 -0500 Subject: [PATCH 1/3] Enable look-up by secondary index in cache --- pkg/client/cache/store.go | 103 +++++++++++++++++++++++++++++++++ pkg/client/cache/store_test.go | 41 +++++++++++++ 2 files changed, 144 insertions(+) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 9f8595203d8..32bc2b30dc8 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // Store is a generic object storage interface. Reflector knows how to watch a server @@ -59,12 +60,35 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) { return meta.Namespace() + "/" + meta.Name(), nil } +// Index is a generic object storage interface that lets you list objects by their Index +type Index interface { + Store + Index(obj interface{}) ([]interface{}, error) +} + +// IndexFunc knows how to provide an indexed value for an object. +type IndexFunc func(obj interface{}) (string, error) + +// MetaNamespaceIndexFunc is a convenient default IndexFun which knows how to index +// an object by its namespace. +func MetaNamespaceIndexFunc(obj interface{}) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", fmt.Errorf("object has no meta: %v", err) + } + return meta.Namespace(), nil +} + type cache struct { lock sync.RWMutex items map[string]interface{} // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc + // indexFunc is used to make the index value for objects stored in an retrieved from index + indexFunc IndexFunc + // maps the indexFunc value for an object to a set whose keys are keys in items + index map[string]util.StringSet } // Add inserts an item into the cache. @@ -76,6 +100,53 @@ func (c *cache) Add(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items[key] = obj + c.updateIndex(obj) + return nil +} + +// updateIndex adds or modifies an object in the index +// it is intended to be called from a function that already has a lock on the cache +func (c *cache) updateIndex(obj interface{}) error { + if c.indexFunc == nil { + return nil + } + key, err := c.keyFunc(obj) + if err != nil { + return err + } + indexValue, err := c.indexFunc(obj) + if err != nil { + return err + } + set := c.index[indexValue] + if set == nil { + set = util.StringSet{} + c.index[indexValue] = set + } + set.Insert(key) + return nil +} + +// deleteFromIndex removes an entry from the index +// it is intended to be called from a function that already has a lock on the cache +func (c *cache) deleteFromIndex(obj interface{}) error { + if c.indexFunc == nil { + return nil + } + key, err := c.keyFunc(obj) + if err != nil { + return err + } + indexValue, err := c.indexFunc(obj) + if err != nil { + return err + } + set := c.index[indexValue] + if set == nil { + set = util.StringSet{} + c.index[indexValue] = set + } + set.Delete(key) return nil } @@ -88,6 +159,7 @@ func (c *cache) Update(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items[key] = obj + c.updateIndex(obj) return nil } @@ -100,6 +172,7 @@ func (c *cache) Delete(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() delete(c.items, key) + c.deleteFromIndex(obj) return nil } @@ -115,6 +188,24 @@ func (c *cache) List() []interface{} { return list } +// Index returns a list of items that match on the index function +// Index is thread-safe so long as you treat all items as immutable +func (c *cache) Index(obj interface{}) ([]interface{}, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + indexKey, err := c.indexFunc(obj) + if err != nil { + return nil, err + } + set := c.index[indexKey] + list := make([]interface{}, 0, set.Len()) + for _, key := range set.List() { + list = append(list, c.items[key]) + } + return list, nil +} + // Get returns the requested item, or sets exists=false. // Get is completely threadsafe as long as you treat all items as immutable. func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) { @@ -150,6 +241,13 @@ func (c *cache) Replace(list []interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items = items + + // rebuild any index + c.index = map[string]util.StringSet{} + for _, item := range c.items { + c.updateIndex(item) + } + return nil } @@ -157,3 +255,8 @@ func (c *cache) Replace(list []interface{}) error { func NewStore(keyFunc KeyFunc) Store { return &cache{items: map[string]interface{}{}, keyFunc: keyFunc} } + +// NewIndex returns an Index implemented simply with a map and a lock. +func NewIndex(keyFunc KeyFunc, indexFunc IndexFunc) Index { + return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexFunc: indexFunc, index: map[string]util.StringSet{}} +} diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index 3d29aec1f1c..9971c3a1d3f 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -86,10 +86,47 @@ func doTestStore(t *testing.T, store Store) { } } +// Test public interface +func doTestIndex(t *testing.T, index Index) { + mkObj := func(id string, val string) testStoreObject { + return testStoreObject{id: id, val: val} + } + + // Test Index + expected := map[string]util.StringSet{} + expected["b"] = util.NewStringSet("a", "c") + expected["f"] = util.NewStringSet("e") + expected["h"] = util.NewStringSet("g") + index.Add(mkObj("a", "b")) + index.Add(mkObj("c", "b")) + index.Add(mkObj("e", "f")) + index.Add(mkObj("g", "h")) + { + for k, v := range expected { + found := util.StringSet{} + indexResults, err := index.Index(mkObj("", k)) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + for _, item := range indexResults { + found.Insert(item.(testStoreObject).id) + } + items := v.List() + if !found.HasAll(items...) { + t.Errorf("missing items, index %s, expected %v but found %v", k, items, found.List()) + } + } + } +} + func testStoreKeyFunc(obj interface{}) (string, error) { return obj.(testStoreObject).id, nil } +func testStoreIndexFunc(obj interface{}) (string, error) { + return obj.(testStoreObject).val, nil +} + type testStoreObject struct { id string val string @@ -107,3 +144,7 @@ func TestUndeltaStore(t *testing.T) { nop := func([]interface{}) {} doTestStore(t, NewUndeltaStore(nop, testStoreKeyFunc)) } + +func TestIndex(t *testing.T) { + doTestIndex(t, NewIndex(testStoreKeyFunc, testStoreIndexFunc)) +} From 1cf69bdefc817e38a2faf34a00aa7e4e4f74c52f Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Thu, 5 Feb 2015 13:22:28 -0500 Subject: [PATCH 2/3] Support multiple index functions, address feedback --- pkg/client/cache/index.go | 52 ++++++++++++++ pkg/client/cache/store.go | 119 +++++++++++++++------------------ pkg/client/cache/store_test.go | 20 ++++-- 3 files changed, 120 insertions(+), 71 deletions(-) create mode 100644 pkg/client/cache/index.go diff --git a/pkg/client/cache/index.go b/pkg/client/cache/index.go new file mode 100644 index 00000000000..f96d227e397 --- /dev/null +++ b/pkg/client/cache/index.go @@ -0,0 +1,52 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Indexer is a storage interface that lets you list objects using multiple indexing functions +type Indexer interface { + Store + // Retrieve list of objects that match on the named indexing function + Index(indexName string, obj interface{}) ([]interface{}, error) +} + +// IndexFunc knows how to provide an indexed value for an object. +type IndexFunc func(obj interface{}) (string, error) + +// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace +func MetaNamespaceIndexFunc(obj interface{}) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", fmt.Errorf("object has no meta: %v", err) + } + return meta.Namespace(), nil +} + +// Index maps the indexed value to a set of keys in the store that match on that value +type Index map[string]util.StringSet + +// Indexers maps a name to a IndexFunc +type Indexers map[string]IndexFunc + +// Indices maps a name to an Index +type Indices map[string]Index diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 32bc2b30dc8..00f7c90c020 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -60,35 +60,16 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) { return meta.Namespace() + "/" + meta.Name(), nil } -// Index is a generic object storage interface that lets you list objects by their Index -type Index interface { - Store - Index(obj interface{}) ([]interface{}, error) -} - -// IndexFunc knows how to provide an indexed value for an object. -type IndexFunc func(obj interface{}) (string, error) - -// MetaNamespaceIndexFunc is a convenient default IndexFun which knows how to index -// an object by its namespace. -func MetaNamespaceIndexFunc(obj interface{}) (string, error) { - meta, err := meta.Accessor(obj) - if err != nil { - return "", fmt.Errorf("object has no meta: %v", err) - } - return meta.Namespace(), nil -} - type cache struct { lock sync.RWMutex items map[string]interface{} // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc - // indexFunc is used to make the index value for objects stored in an retrieved from index - indexFunc IndexFunc - // maps the indexFunc value for an object to a set whose keys are keys in items - index map[string]util.StringSet + // indexers maps a name to an IndexFunc + indexers Indexers + // indices maps a name to an Index + indices Indices } // Add inserts an item into the cache. @@ -100,53 +81,57 @@ func (c *cache) Add(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items[key] = obj - c.updateIndex(obj) + c.updateIndices(obj) return nil } -// updateIndex adds or modifies an object in the index -// it is intended to be called from a function that already has a lock on the cache -func (c *cache) updateIndex(obj interface{}) error { - if c.indexFunc == nil { - return nil - } +// updateIndices modifies the objects location in the managed indexes +// updateIndices must be called from a function that already has a lock on the cache +func (c *cache) updateIndices(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { return err } - indexValue, err := c.indexFunc(obj) - if err != nil { - return err + for name, indexFunc := range c.indexers { + indexValue, err := indexFunc(obj) + if err != nil { + return err + } + index := c.indices[name] + if index == nil { + index = Index{} + c.indices[name] = index + } + set := index[indexValue] + if set == nil { + set = util.StringSet{} + index[indexValue] = set + } + set.Insert(key) } - set := c.index[indexValue] - if set == nil { - set = util.StringSet{} - c.index[indexValue] = set - } - set.Insert(key) return nil } -// deleteFromIndex removes an entry from the index +// deleteFromIndices removes the object from each of the managed indexes // it is intended to be called from a function that already has a lock on the cache -func (c *cache) deleteFromIndex(obj interface{}) error { - if c.indexFunc == nil { - return nil - } +func (c *cache) deleteFromIndices(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { return err } - indexValue, err := c.indexFunc(obj) - if err != nil { - return err + for name, indexFunc := range c.indexers { + indexValue, err := indexFunc(obj) + if err != nil { + return err + } + index := c.indices[name] + if index != nil { + set := index[indexValue] + if set != nil { + set.Delete(key) + } + } } - set := c.index[indexValue] - if set == nil { - set = util.StringSet{} - c.index[indexValue] = set - } - set.Delete(key) return nil } @@ -159,7 +144,7 @@ func (c *cache) Update(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() c.items[key] = obj - c.updateIndex(obj) + c.updateIndices(obj) return nil } @@ -172,7 +157,7 @@ func (c *cache) Delete(obj interface{}) error { c.lock.Lock() defer c.lock.Unlock() delete(c.items, key) - c.deleteFromIndex(obj) + c.deleteFromIndices(obj) return nil } @@ -190,15 +175,21 @@ func (c *cache) List() []interface{} { // Index returns a list of items that match on the index function // Index is thread-safe so long as you treat all items as immutable -func (c *cache) Index(obj interface{}) ([]interface{}, error) { +func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() - indexKey, err := c.indexFunc(obj) + indexFunc := c.indexers[indexName] + if indexFunc == nil { + return nil, fmt.Errorf("Index with name %s does not exist", indexName) + } + + indexKey, err := indexFunc(obj) if err != nil { return nil, err } - set := c.index[indexKey] + index := c.indices[indexName] + set := index[indexKey] list := make([]interface{}, 0, set.Len()) for _, key := range set.List() { list = append(list, c.items[key]) @@ -243,9 +234,9 @@ func (c *cache) Replace(list []interface{}) error { c.items = items // rebuild any index - c.index = map[string]util.StringSet{} + c.indices = Indices{} for _, item := range c.items { - c.updateIndex(item) + c.updateIndices(item) } return nil @@ -253,10 +244,10 @@ func (c *cache) Replace(list []interface{}) error { // NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc) Store { - return &cache{items: map[string]interface{}{}, keyFunc: keyFunc} + return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: Indexers{}, indices: Indices{}} } -// NewIndex returns an Index implemented simply with a map and a lock. -func NewIndex(keyFunc KeyFunc, indexFunc IndexFunc) Index { - return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexFunc: indexFunc, index: map[string]util.StringSet{}} +// NewIndexer returns an Indexer implemented simply with a map and a lock. +func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { + return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: indexers, indices: Indices{}} } diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index 9971c3a1d3f..ad2b6eda2c4 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -87,7 +87,7 @@ func doTestStore(t *testing.T, store Store) { } // Test public interface -func doTestIndex(t *testing.T, index Index) { +func doTestIndex(t *testing.T, indexer Indexer) { mkObj := func(id string, val string) testStoreObject { return testStoreObject{id: id, val: val} } @@ -97,14 +97,14 @@ func doTestIndex(t *testing.T, index Index) { expected["b"] = util.NewStringSet("a", "c") expected["f"] = util.NewStringSet("e") expected["h"] = util.NewStringSet("g") - index.Add(mkObj("a", "b")) - index.Add(mkObj("c", "b")) - index.Add(mkObj("e", "f")) - index.Add(mkObj("g", "h")) + indexer.Add(mkObj("a", "b")) + indexer.Add(mkObj("c", "b")) + indexer.Add(mkObj("e", "f")) + indexer.Add(mkObj("g", "h")) { for k, v := range expected { found := util.StringSet{} - indexResults, err := index.Index(mkObj("", k)) + indexResults, err := indexer.Index("by_val", mkObj("", k)) if err != nil { t.Errorf("Unexpected error %v", err) } @@ -127,6 +127,12 @@ func testStoreIndexFunc(obj interface{}) (string, error) { return obj.(testStoreObject).val, nil } +func testStoreIndexers() Indexers { + indexers := Indexers{} + indexers["by_val"] = testStoreIndexFunc + return indexers +} + type testStoreObject struct { id string val string @@ -146,5 +152,5 @@ func TestUndeltaStore(t *testing.T) { } func TestIndex(t *testing.T) { - doTestIndex(t, NewIndex(testStoreKeyFunc, testStoreIndexFunc)) + doTestIndex(t, NewIndexer(testStoreKeyFunc, testStoreIndexers())) } From 53f1efa484e03f417e1faddb5ce33a45f1f2afbd Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Mon, 9 Feb 2015 16:34:44 -0500 Subject: [PATCH 3/3] Make sure we remove old object from index before adding new --- pkg/client/cache/store.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 00f7c90c020..ab4a3a07920 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -78,22 +78,28 @@ func (c *cache) Add(obj interface{}) error { if err != nil { return fmt.Errorf("couldn't create key for object: %v", err) } + // keep a pointer to whatever could have been there previously c.lock.Lock() defer c.lock.Unlock() + oldObject := c.items[key] c.items[key] = obj - c.updateIndices(obj) + c.updateIndices(oldObject, obj) return nil } -// updateIndices modifies the objects location in the managed indexes +// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj // updateIndices must be called from a function that already has a lock on the cache -func (c *cache) updateIndices(obj interface{}) error { - key, err := c.keyFunc(obj) +func (c *cache) updateIndices(oldObj interface{}, newObj interface{}) error { + // if we got an old object, we need to remove it before we add it again + if oldObj != nil { + c.deleteFromIndices(oldObj) + } + key, err := c.keyFunc(newObj) if err != nil { return err } for name, indexFunc := range c.indexers { - indexValue, err := indexFunc(obj) + indexValue, err := indexFunc(newObj) if err != nil { return err } @@ -143,8 +149,9 @@ func (c *cache) Update(obj interface{}) error { } c.lock.Lock() defer c.lock.Unlock() + oldObject := c.items[key] c.items[key] = obj - c.updateIndices(obj) + c.updateIndices(oldObject, obj) return nil } @@ -236,7 +243,7 @@ func (c *cache) Replace(list []interface{}) error { // rebuild any index c.indices = Indices{} for _, item := range c.items { - c.updateIndices(item) + c.updateIndices(nil, item) } return nil