add indexer for storage cacher

Signed-off-by: shaloulcy <lcy041536@gmail.com>
This commit is contained in:
shaloulcy 2019-11-19 16:52:07 +08:00
parent 5caeec8023
commit 87582e2c3c
11 changed files with 171 additions and 28 deletions

View File

@ -23,6 +23,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/tools/cache"
)
// RESTOptions is set of configuration options to generic registries.
@ -49,4 +50,5 @@ type StoreOptions struct {
RESTOptions RESTOptionsGetter
TriggerFunc storage.IndexerFuncs
AttrFunc storage.AttrFunc
Indexers *cache.Indexers
}

View File

@ -85,6 +85,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/client-go/tools/cache"
)
// Creates a cacher based given storageConfig.
@ -39,7 +40,8 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFuncs storage.IndexerFuncs) (storage.Interface, factory.DestroyFunc, error) {
triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig)
if err != nil {
@ -65,6 +67,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
IndexerFuncs: triggerFuncs,
Indexers: indexers,
Codec: storageConfig.Codec,
}
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)

View File

@ -1322,6 +1322,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
if err != nil {
return err

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/client-go/tools/cache"
)
// StorageDecorator is a function signature for producing a storage.Interface
@ -32,7 +33,8 @@ type StorageDecorator func(
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs) (storage.Interface, factory.DestroyFunc, error)
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error)
// UndecoratedStorage returns the given a new storage from the given config
// without any decoration.
@ -43,7 +45,8 @@ func UndecoratedStorage(
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs) (storage.Interface, factory.DestroyFunc, error) {
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config)
}

View File

@ -100,6 +100,10 @@ type Config struct {
// needs to process an incoming event.
IndexerFuncs storage.IndexerFuncs
// Indexers is used to accelerate the list operation, falls back to regular list
// operation if no indexer found.
Indexers *cache.Indexers
// NewFunc is a function that creates new empty object storing a object of type Type.
NewFunc func() runtime.Object
@ -367,7 +371,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}
watchCache := newWatchCache(
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner)
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
@ -701,7 +705,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
}
filter := filterWithAttrsFunction(key, pred)
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
if err != nil {
return err
}

View File

@ -82,6 +82,35 @@ func storeElementKey(obj interface{}) (string, error) {
return elem.Key, nil
}
func storeElementObject(obj interface{}) (runtime.Object, error) {
elem, ok := obj.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Object, nil
}
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
return func(obj interface{}) (strings []string, e error) {
seo, err := storeElementObject(obj)
if err != nil {
return nil, err
}
return objIndexFunc(seo)
}
}
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
if indexers == nil {
return cache.Indexers{}
}
ret := cache.Indexers{}
for indexName, indexFunc := range *indexers {
ret[indexName] = storeElementIndexFunc(indexFunc)
}
return ret
}
// watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface.
//
@ -116,7 +145,7 @@ type watchCache struct {
// history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now.
// NOTE: We assume that <store> is thread-safe.
store cache.Store
store cache.Indexer
// ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64
@ -143,7 +172,8 @@ func newWatchCache(
keyFunc func(runtime.Object) (string, error),
eventHandler func(*watchCacheEvent),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
versioner storage.Versioner) *watchCache {
versioner storage.Versioner,
indexers *cache.Indexers) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
@ -151,7 +181,7 @@ func newWatchCache(
cache: make([]*watchCacheEvent, capacity),
startIndex: 0,
endIndex: 0,
store: cache.NewStore(storeElementKey),
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
resourceVersion: 0,
listResourceVersion: 0,
eventHandler: eventHandler,
@ -319,12 +349,19 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utilt
}
// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *utiltrace.Trace) ([]interface{}, uint64, error) {
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, 0, err
}
// TODO: use the smallest key size index.
for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
return result, w.resourceVersion, nil
}
}
return w.store.List(), w.resourceVersion, nil
}

View File

@ -39,17 +39,19 @@ import (
)
func makeTestPod(name string, resourceVersion uint64) *v1.Pod {
return makeTestPodDetails(name, resourceVersion, "some-node", map[string]string{"k8s-app": "my-app"})
}
func makeTestPodDetails(name string, resourceVersion uint64, nodeName string, labels map[string]string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: name,
ResourceVersion: strconv.FormatUint(resourceVersion, 10),
Labels: map[string]string{
"k8s-app": "my-app",
},
Labels: labels,
},
Spec: v1.PodSpec{
NodeName: "some-node",
NodeName: nodeName,
},
}
}
@ -64,7 +66,7 @@ func makeTestStoreElement(pod *v1.Pod) *storeElement {
}
// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int) *watchCache {
func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache {
keyFunc := func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc("prefix", obj)
}
@ -77,13 +79,13 @@ func newTestWatchCache(capacity int) *watchCache {
}
versioner := etcd3.APIObjectVersioner{}
mockHandler := func(*watchCacheEvent) {}
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner)
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner, indexers)
wc.clock = clock.NewFakeClock(time.Now())
return wc
}
func TestWatchCacheBasic(t *testing.T) {
store := newTestWatchCache(2)
store := newTestWatchCache(2, &cache.Indexers{})
// Test Add/Update/Delete.
pod1 := makeTestPod("pod", 1)
@ -160,7 +162,7 @@ func TestWatchCacheBasic(t *testing.T) {
}
func TestEvents(t *testing.T) {
store := newTestWatchCache(5)
store := newTestWatchCache(5, &cache.Indexers{})
store.Add(makeTestPod("pod", 3))
@ -280,7 +282,7 @@ func TestEvents(t *testing.T) {
}
func TestMarker(t *testing.T) {
store := newTestWatchCache(3)
store := newTestWatchCache(3, &cache.Indexers{})
// First thing that is called when propagated from storage is Replace.
store.Replace([]interface{}{
@ -315,15 +317,51 @@ func TestMarker(t *testing.T) {
}
func TestWaitUntilFreshAndList(t *testing.T) {
store := newTestWatchCache(3)
store := newTestWatchCache(3, &cache.Indexers{
"label": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("not a pod %#v", obj)
}
if value, ok := pod.Labels["label"]; ok {
return []string{value}, nil
}
return nil, nil
},
"spec.nodeName": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("not a pod %#v", obj)
}
return []string{pod.Spec.NodeName}, nil
},
})
// In background, update the store.
go func() {
store.Add(makeTestPod("foo", 2))
store.Add(makeTestPod("bar", 5))
store.Add(makeTestPodDetails("pod1", 2, "node1", map[string]string{"label": "value1"}))
store.Add(makeTestPodDetails("pod2", 3, "node1", map[string]string{"label": "value1"}))
store.Add(makeTestPodDetails("pod3", 5, "node2", map[string]string{"label": "value2"}))
}()
list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil)
// list by empty MatchValues.
list, resourceVersion, err := store.WaitUntilFreshAndList(5, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
if len(list) != 3 {
t.Errorf("unexpected list returned: %#v", list)
}
// list by label index.
matchValues := []storage.MatchValue{
{IndexName: "label", Value: "value1"},
{IndexName: "spec.nodeName", Value: "node2"},
}
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -333,10 +371,38 @@ func TestWaitUntilFreshAndList(t *testing.T) {
if len(list) != 2 {
t.Errorf("unexpected list returned: %#v", list)
}
// list with spec.nodeName index.
matchValues = []storage.MatchValue{
{IndexName: "not-exist-label", Value: "whatever"},
{IndexName: "spec.nodeName", Value: "node2"},
}
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
if len(list) != 1 {
t.Errorf("unexpected list returned: %#v", list)
}
// list with index not exists.
matchValues = []storage.MatchValue{
{IndexName: "not-exist-label", Value: "whatever"},
}
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
if len(list) != 3 {
t.Errorf("unexpected list returned: %#v", list)
}
}
func TestWaitUntilFreshAndGet(t *testing.T) {
store := newTestWatchCache(3)
store := newTestWatchCache(3, &cache.Indexers{})
// In background, update the store.
go func() {
@ -361,7 +427,7 @@ func TestWaitUntilFreshAndGet(t *testing.T) {
}
func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store := newTestWatchCache(3)
store := newTestWatchCache(3, &cache.Indexers{})
fc := store.clock.(*clock.FakeClock)
// In background, step clock after the below call starts the timer.
@ -378,7 +444,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store.Add(makeTestPod("bar", 5))
}()
_, _, err := store.WaitUntilFreshAndList(5, nil)
_, _, err := store.WaitUntilFreshAndList(5, nil, nil)
if !errors.IsTimeout(err) {
t.Errorf("expected timeout error but got: %v", err)
}
@ -400,10 +466,10 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
}
func TestReflectorForWatchCache(t *testing.T) {
store := newTestWatchCache(5)
store := newTestWatchCache(5, &cache.Indexers{})
{
_, version, err := store.WaitUntilFreshAndList(0, nil)
_, version, err := store.WaitUntilFreshAndList(0, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -426,7 +492,7 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(wait.NeverStop)
{
_, version, err := store.WaitUntilFreshAndList(10, nil)
_, version, err := store.WaitUntilFreshAndList(10, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -87,6 +87,12 @@ var Everything = SelectionPredicate{
Field: fields.Everything(),
}
// MatchValue defines a pair (<index name>, <value for that index>).
type MatchValue struct {
IndexName string
Value string
}
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
// that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more details.

View File

@ -74,6 +74,7 @@ type SelectionPredicate struct {
Label labels.Selector
Field fields.Selector
GetAttrs AttrFunc
IndexLabels []string
IndexFields []string
Limit int64
Continue string
@ -128,3 +129,21 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) {
func (s *SelectionPredicate) Empty() bool {
return s.Label.Empty() && s.Field.Empty()
}
// For any index defined by IndexFields, if a matcher can match only (a subset)
// of objects that return <value> for a given index, a pair (<index name>, <value>)
// wil be returned.
func (s *SelectionPredicate) MatcherIndex() []MatchValue {
var result []MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, MatchValue{IndexName: field, Value: value})
}
}
for _, label := range s.IndexLabels {
if value, ok := s.Label.RequiresExactMatch(label); ok {
result = append(result, MatchValue{IndexName: label, Value: value})
}
}
return result
}