mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
apiserver/storage/watchcache: WaitUntilFreshAndList supports path prefix
This commit is contained in:
parent
c259fe2342
commit
2f9660db6b
@ -615,7 +615,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
// to compute watcher.forget function (which has to happen under lock).
|
// to compute watcher.forget function (which has to happen under lock).
|
||||||
watcher := newCacheWatcher(
|
watcher := newCacheWatcher(
|
||||||
chanSize,
|
chanSize,
|
||||||
filterWithAttrsFunction(key, pred),
|
filterWithAttrsAndPrefixFunction(key, pred),
|
||||||
emptyFunc,
|
emptyFunc,
|
||||||
c.versioner,
|
c.versioner,
|
||||||
deadline,
|
deadline,
|
||||||
@ -809,7 +809,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred
|
|||||||
}
|
}
|
||||||
return nil, readResourceVersion, "", nil
|
return nil, readResourceVersion, "", nil
|
||||||
}
|
}
|
||||||
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex(ctx))
|
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetList implements storage.Interface
|
// GetList implements storage.Interface
|
||||||
@ -885,7 +885,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
if listVal.Kind() != reflect.Slice {
|
if listVal.Kind() != reflect.Slice {
|
||||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||||
}
|
}
|
||||||
filter := filterWithAttrsFunction(preparedKey, pred)
|
|
||||||
|
|
||||||
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
|
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -905,7 +904,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||||
}
|
}
|
||||||
if filter(elem.Key, elem.Labels, elem.Fields) {
|
if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
|
||||||
selectedObjects = append(selectedObjects, elem.Object)
|
selectedObjects = append(selectedObjects, elem.Object)
|
||||||
lastSelectedObjectKey = elem.Key
|
lastSelectedObjectKey = elem.Key
|
||||||
}
|
}
|
||||||
@ -1320,7 +1319,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
|
func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
|
||||||
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
|
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
|
||||||
if !hasPathPrefix(objKey, key) {
|
if !hasPathPrefix(objKey, key) {
|
||||||
return false
|
return false
|
||||||
|
@ -501,7 +501,29 @@ func (s sortableStoreElements) Swap(i, j int) {
|
|||||||
|
|
||||||
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
|
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
|
||||||
// with their ResourceVersion and the name of the index, if any, that was used.
|
// with their ResourceVersion and the name of the index, if any, that was used.
|
||||||
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
|
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
|
||||||
|
items, rv, index, err := w.waitUntilFreshAndListItems(ctx, resourceVersion, key, matchValues)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []interface{}
|
||||||
|
for _, item := range items {
|
||||||
|
elem, ok := item.(*storeElement)
|
||||||
|
if !ok {
|
||||||
|
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
|
||||||
|
}
|
||||||
|
if !hasPathPrefix(elem.Key, key) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
result = append(result, item)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(sortableStoreElements(result))
|
||||||
|
return result, rv, index, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
|
||||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
|
||||||
w.waitingUntilFresh.Add()
|
w.waitingUntilFresh.Add()
|
||||||
@ -511,7 +533,6 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
|
|||||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() { sort.Sort(sortableStoreElements(result)) }()
|
|
||||||
defer w.RUnlock()
|
defer w.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, rv, index, err
|
return result, rv, index, err
|
||||||
|
@ -462,7 +462,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// list by empty MatchValues.
|
// list by empty MatchValues.
|
||||||
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, nil)
|
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -481,7 +481,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
{IndexName: "l:label", Value: "value1"},
|
{IndexName: "l:label", Value: "value1"},
|
||||||
{IndexName: "f:spec.nodeName", Value: "node2"},
|
{IndexName: "f:spec.nodeName", Value: "node2"},
|
||||||
}
|
}
|
||||||
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
|
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -500,7 +500,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
{IndexName: "l:not-exist-label", Value: "whatever"},
|
{IndexName: "l:not-exist-label", Value: "whatever"},
|
||||||
{IndexName: "f:spec.nodeName", Value: "node2"},
|
{IndexName: "f:spec.nodeName", Value: "node2"},
|
||||||
}
|
}
|
||||||
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
|
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -518,7 +518,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
matchValues = []storage.MatchValue{
|
matchValues = []storage.MatchValue{
|
||||||
{IndexName: "l:not-exist-label", Value: "whatever"},
|
{IndexName: "l:not-exist-label", Value: "whatever"},
|
||||||
}
|
}
|
||||||
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
|
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -546,7 +546,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// list from future revision. Requires watch cache to request bookmark to get it.
|
// list from future revision. Requires watch cache to request bookmark to get it.
|
||||||
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil)
|
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
|
|||||||
store.Add(makeTestPod("bar", 4))
|
store.Add(makeTestPod("bar", 4))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil)
|
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
|
||||||
if !errors.IsTimeout(err) {
|
if !errors.IsTimeout(err) {
|
||||||
t.Errorf("expected timeout error but got: %v", err)
|
t.Errorf("expected timeout error but got: %v", err)
|
||||||
}
|
}
|
||||||
@ -655,7 +655,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||||||
defer store.Stop()
|
defer store.Stop()
|
||||||
|
|
||||||
{
|
{
|
||||||
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil)
|
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -678,7 +678,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||||||
r.ListAndWatch(wait.NeverStop)
|
r.ListAndWatch(wait.NeverStop)
|
||||||
|
|
||||||
{
|
{
|
||||||
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, nil)
|
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user