validate storage cache indexers

Signed-off-by: shaloulcy <lcy041536@gmail.com>
This commit is contained in:
shaloulcy 2020-02-05 12:08:14 +08:00
parent 87582e2c3c
commit fa9ba80a67
6 changed files with 94 additions and 10 deletions

View File

@ -45,6 +45,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -45,6 +45,7 @@ import (
storeerr "k8s.io/apiserver/pkg/storage/errors"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/util/dryrun"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)
@ -1247,6 +1248,11 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
}
}
err := validateIndexers(options.Indexers)
if err != nil {
return err
}
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
if err != nil {
return err
@ -1372,3 +1378,16 @@ func (e *Store) ConvertToTable(ctx context.Context, object runtime.Object, table
func (e *Store) StorageVersion() runtime.GroupVersioner {
return e.StorageVersioner
}
// validateIndexers will check the prefix of indexers.
func validateIndexers(indexers *cache.Indexers) error {
if indexers == nil {
return nil
}
for indexName := range *indexers {
if len(indexName) <= 2 || (indexName[:2] != "l:" && indexName[:2] != "f:") {
return fmt.Errorf("index must prefix with \"l:\" or \"f:\"")
}
}
return nil
}

View File

@ -54,6 +54,7 @@ import (
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/client-go/tools/cache"
)
var scheme = runtime.NewScheme()
@ -2090,3 +2091,53 @@ func TestRetryDeleteValidation(t *testing.T) {
}
}
}
func emptyIndexFunc(obj interface{}) ([]string, error) {
return []string{}, nil
}
func TestValidateIndexers(t *testing.T) {
testcases := []struct {
name string
indexers *cache.Indexers
expectedError bool
}{
{
name: "nil indexers",
indexers: nil,
expectedError: false,
},
{
name: "normal indexers",
indexers: &cache.Indexers{
"f:spec.nodeName": emptyIndexFunc,
"l:controller-revision-hash": emptyIndexFunc,
},
expectedError: false,
},
{
name: "too short indexers",
indexers: &cache.Indexers{
"f": emptyIndexFunc,
},
expectedError: true,
},
{
name: "invalid indexers",
indexers: &cache.Indexers{
"spec.nodeName": emptyIndexFunc,
},
expectedError: true,
},
}
for _, tc := range testcases {
err := validateIndexers(tc.indexers)
if tc.expectedError && err == nil {
t.Errorf("%v: expected error, but got nil", tc.name)
}
if !tc.expectedError && err != nil {
t.Errorf("%v: expected no error, but got %v", tc.name, err)
}
}
}

View File

@ -356,7 +356,10 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues [
return nil, 0, err
}
// TODO: use the smallest key size index.
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
return result, w.resourceVersion, nil

View File

@ -318,7 +318,7 @@ func TestMarker(t *testing.T) {
func TestWaitUntilFreshAndList(t *testing.T) {
store := newTestWatchCache(3, &cache.Indexers{
"label": func(obj interface{}) ([]string, error) {
"l:label": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("not a pod %#v", obj)
@ -328,7 +328,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}
return nil, nil
},
"spec.nodeName": func(obj interface{}) ([]string, error) {
"f:spec.nodeName": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("not a pod %#v", obj)
@ -358,8 +358,8 @@ func TestWaitUntilFreshAndList(t *testing.T) {
// list by label index.
matchValues := []storage.MatchValue{
{IndexName: "label", Value: "value1"},
{IndexName: "spec.nodeName", Value: "node2"},
{IndexName: "l:label", Value: "value1"},
{IndexName: "f:spec.nodeName", Value: "node2"},
}
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if err != nil {
@ -374,8 +374,8 @@ func TestWaitUntilFreshAndList(t *testing.T) {
// list with spec.nodeName index.
matchValues = []storage.MatchValue{
{IndexName: "not-exist-label", Value: "whatever"},
{IndexName: "spec.nodeName", Value: "node2"},
{IndexName: "l:not-exist-label", Value: "whatever"},
{IndexName: "f:spec.nodeName", Value: "node2"},
}
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if err != nil {
@ -390,7 +390,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
// list with index not exists.
matchValues = []storage.MatchValue{
{IndexName: "not-exist-label", Value: "whatever"},
{IndexName: "l:not-exist-label", Value: "whatever"},
}
list, resourceVersion, err = store.WaitUntilFreshAndList(5, matchValues, nil)
if resourceVersion != 5 {

View File

@ -137,13 +137,23 @@ func (s *SelectionPredicate) MatcherIndex() []MatchValue {
var result []MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, MatchValue{IndexName: field, Value: value})
result = append(result, MatchValue{IndexName: FieldIndex(field), Value: value})
}
}
for _, label := range s.IndexLabels {
if value, ok := s.Label.RequiresExactMatch(label); ok {
result = append(result, MatchValue{IndexName: label, Value: value})
result = append(result, MatchValue{IndexName: LabelIndex(label), Value: value})
}
}
return result
}
// LabelIndex add prefix for label index.
func LabelIndex(label string) string {
return "l:" + label
}
// FiledIndex add prefix for field index.
func FieldIndex(field string) string {
return "f:" + field
}