mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #4125 from derekwaynecarr/store_by_namespace
Enable look-up by secondary index in cache
This commit is contained in:
commit
371349f858
52
pkg/client/cache/index.go
vendored
Normal file
52
pkg/client/cache/index.go
vendored
Normal file
@ -0,0 +1,52 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
// Indexer is a storage interface that lets you list objects using multiple indexing functions
|
||||
type Indexer interface {
|
||||
Store
|
||||
// Retrieve list of objects that match on the named indexing function
|
||||
Index(indexName string, obj interface{}) ([]interface{}, error)
|
||||
}
|
||||
|
||||
// IndexFunc knows how to provide an indexed value for an object.
|
||||
type IndexFunc func(obj interface{}) (string, error)
|
||||
|
||||
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
|
||||
func MetaNamespaceIndexFunc(obj interface{}) (string, error) {
|
||||
meta, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("object has no meta: %v", err)
|
||||
}
|
||||
return meta.Namespace(), nil
|
||||
}
|
||||
|
||||
// Index maps the indexed value to a set of keys in the store that match on that value
|
||||
type Index map[string]util.StringSet
|
||||
|
||||
// Indexers maps a name to a IndexFunc
|
||||
type Indexers map[string]IndexFunc
|
||||
|
||||
// Indices maps a name to an Index
|
||||
type Indices map[string]Index
|
103
pkg/client/cache/store.go
vendored
103
pkg/client/cache/store.go
vendored
@ -21,6 +21,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
// Store is a generic object storage interface. Reflector knows how to watch a server
|
||||
@ -65,6 +66,10 @@ type cache struct {
|
||||
// keyFunc is used to make the key for objects stored in and retrieved from items, and
|
||||
// should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
// indexers maps a name to an IndexFunc
|
||||
indexers Indexers
|
||||
// indices maps a name to an Index
|
||||
indices Indices
|
||||
}
|
||||
|
||||
// Add inserts an item into the cache.
|
||||
@ -73,9 +78,66 @@ func (c *cache) Add(obj interface{}) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create key for object: %v", err)
|
||||
}
|
||||
// keep a pointer to whatever could have been there previously
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
oldObject := c.items[key]
|
||||
c.items[key] = obj
|
||||
c.updateIndices(oldObject, obj)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
|
||||
// updateIndices must be called from a function that already has a lock on the cache
|
||||
func (c *cache) updateIndices(oldObj interface{}, newObj interface{}) error {
|
||||
// if we got an old object, we need to remove it before we add it again
|
||||
if oldObj != nil {
|
||||
c.deleteFromIndices(oldObj)
|
||||
}
|
||||
key, err := c.keyFunc(newObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for name, indexFunc := range c.indexers {
|
||||
indexValue, err := indexFunc(newObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
index := c.indices[name]
|
||||
if index == nil {
|
||||
index = Index{}
|
||||
c.indices[name] = index
|
||||
}
|
||||
set := index[indexValue]
|
||||
if set == nil {
|
||||
set = util.StringSet{}
|
||||
index[indexValue] = set
|
||||
}
|
||||
set.Insert(key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteFromIndices removes the object from each of the managed indexes
|
||||
// it is intended to be called from a function that already has a lock on the cache
|
||||
func (c *cache) deleteFromIndices(obj interface{}) error {
|
||||
key, err := c.keyFunc(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for name, indexFunc := range c.indexers {
|
||||
indexValue, err := indexFunc(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
index := c.indices[name]
|
||||
if index != nil {
|
||||
set := index[indexValue]
|
||||
if set != nil {
|
||||
set.Delete(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -87,7 +149,9 @@ func (c *cache) Update(obj interface{}) error {
|
||||
}
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
oldObject := c.items[key]
|
||||
c.items[key] = obj
|
||||
c.updateIndices(oldObject, obj)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -100,6 +164,7 @@ func (c *cache) Delete(obj interface{}) error {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
delete(c.items, key)
|
||||
c.deleteFromIndices(obj)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -115,6 +180,30 @@ func (c *cache) List() []interface{} {
|
||||
return list
|
||||
}
|
||||
|
||||
// Index returns a list of items that match on the index function
|
||||
// Index is thread-safe so long as you treat all items as immutable
|
||||
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
|
||||
indexFunc := c.indexers[indexName]
|
||||
if indexFunc == nil {
|
||||
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
||||
}
|
||||
|
||||
indexKey, err := indexFunc(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index := c.indices[indexName]
|
||||
set := index[indexKey]
|
||||
list := make([]interface{}, 0, set.Len())
|
||||
for _, key := range set.List() {
|
||||
list = append(list, c.items[key])
|
||||
}
|
||||
return list, nil
|
||||
}
|
||||
|
||||
// Get returns the requested item, or sets exists=false.
|
||||
// Get is completely threadsafe as long as you treat all items as immutable.
|
||||
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
||||
@ -150,10 +239,22 @@ func (c *cache) Replace(list []interface{}) error {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.items = items
|
||||
|
||||
// rebuild any index
|
||||
c.indices = Indices{}
|
||||
for _, item := range c.items {
|
||||
c.updateIndices(nil, item)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewStore returns a Store implemented simply with a map and a lock.
|
||||
func NewStore(keyFunc KeyFunc) Store {
|
||||
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc}
|
||||
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: Indexers{}, indices: Indices{}}
|
||||
}
|
||||
|
||||
// NewIndexer returns an Indexer implemented simply with a map and a lock.
|
||||
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
|
||||
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: indexers, indices: Indices{}}
|
||||
}
|
||||
|
47
pkg/client/cache/store_test.go
vendored
47
pkg/client/cache/store_test.go
vendored
@ -86,10 +86,53 @@ func doTestStore(t *testing.T, store Store) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test public interface
|
||||
func doTestIndex(t *testing.T, indexer Indexer) {
|
||||
mkObj := func(id string, val string) testStoreObject {
|
||||
return testStoreObject{id: id, val: val}
|
||||
}
|
||||
|
||||
// Test Index
|
||||
expected := map[string]util.StringSet{}
|
||||
expected["b"] = util.NewStringSet("a", "c")
|
||||
expected["f"] = util.NewStringSet("e")
|
||||
expected["h"] = util.NewStringSet("g")
|
||||
indexer.Add(mkObj("a", "b"))
|
||||
indexer.Add(mkObj("c", "b"))
|
||||
indexer.Add(mkObj("e", "f"))
|
||||
indexer.Add(mkObj("g", "h"))
|
||||
{
|
||||
for k, v := range expected {
|
||||
found := util.StringSet{}
|
||||
indexResults, err := indexer.Index("by_val", mkObj("", k))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
for _, item := range indexResults {
|
||||
found.Insert(item.(testStoreObject).id)
|
||||
}
|
||||
items := v.List()
|
||||
if !found.HasAll(items...) {
|
||||
t.Errorf("missing items, index %s, expected %v but found %v", k, items, found.List())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testStoreKeyFunc(obj interface{}) (string, error) {
|
||||
return obj.(testStoreObject).id, nil
|
||||
}
|
||||
|
||||
func testStoreIndexFunc(obj interface{}) (string, error) {
|
||||
return obj.(testStoreObject).val, nil
|
||||
}
|
||||
|
||||
func testStoreIndexers() Indexers {
|
||||
indexers := Indexers{}
|
||||
indexers["by_val"] = testStoreIndexFunc
|
||||
return indexers
|
||||
}
|
||||
|
||||
type testStoreObject struct {
|
||||
id string
|
||||
val string
|
||||
@ -107,3 +150,7 @@ func TestUndeltaStore(t *testing.T) {
|
||||
nop := func([]interface{}) {}
|
||||
doTestStore(t, NewUndeltaStore(nop, testStoreKeyFunc))
|
||||
}
|
||||
|
||||
func TestIndex(t *testing.T) {
|
||||
doTestIndex(t, NewIndexer(testStoreKeyFunc, testStoreIndexers()))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user