mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Unify List and GetToList functions of cacher.
This commit is contained in:
parent
9b180d8913
commit
c86543d9cb
@ -610,75 +610,40 @@ func shouldDelegateList(opts storage.ListOptions) bool {
|
|||||||
|
|
||||||
// GetToList implements storage.Interface.
|
// GetToList implements storage.Interface.
|
||||||
func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
resourceVersion := opts.ResourceVersion
|
return c.list(ctx, key, opts, listObj, false)
|
||||||
pred := opts.Predicate
|
|
||||||
if shouldDelegateList(opts) {
|
|
||||||
return c.storage.GetToList(ctx, key, opts, listObj)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If resourceVersion is specified, serve it from cache.
|
|
||||||
// It's guaranteed that the returned value is at least that
|
|
||||||
// fresh as the given resourceVersion.
|
|
||||||
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if listRV == 0 && !c.ready.check() {
|
|
||||||
// If Cacher is not yet initialized and we don't require any specific
|
|
||||||
// minimal resource version, simply forward the request to storage.
|
|
||||||
return c.storage.GetToList(ctx, key, opts, listObj)
|
|
||||||
}
|
|
||||||
|
|
||||||
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
|
|
||||||
defer trace.LogIfLong(500 * time.Millisecond)
|
|
||||||
|
|
||||||
c.ready.wait()
|
|
||||||
trace.Step("Ready")
|
|
||||||
|
|
||||||
// List elements with at least 'listRV' from cache.
|
|
||||||
listPtr, err := meta.GetItemsPtr(listObj)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
listVal, err := conversion.EnforcePtr(listPtr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if listVal.Kind() != reflect.Slice {
|
|
||||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
|
||||||
}
|
|
||||||
filter := filterWithAttrsFunction(key, pred)
|
|
||||||
|
|
||||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
trace.Step("Got from cache")
|
|
||||||
|
|
||||||
if exists {
|
|
||||||
elem, ok := obj.(*storeElement)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
|
||||||
}
|
|
||||||
if filter(elem.Key, elem.Labels, elem.Fields) {
|
|
||||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if c.versioner != nil {
|
|
||||||
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// List implements storage.Interface.
|
// List implements storage.Interface.
|
||||||
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
return c.list(ctx, key, opts, listObj, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cacher) delegateList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error {
|
||||||
|
if !recursive {
|
||||||
|
return c.storage.GetToList(ctx, key, opts, listObj)
|
||||||
|
}
|
||||||
|
return c.storage.List(ctx, key, opts, listObj)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPredicate, trace *utiltrace.Trace, recursive bool) ([]interface{}, uint64, string, error) {
|
||||||
|
if !recursive {
|
||||||
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, "", err
|
||||||
|
}
|
||||||
|
if exists {
|
||||||
|
return []interface{}{obj}, readResourceVersion, "", nil
|
||||||
|
}
|
||||||
|
return nil, readResourceVersion, "", nil
|
||||||
|
}
|
||||||
|
return c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error {
|
||||||
resourceVersion := opts.ResourceVersion
|
resourceVersion := opts.ResourceVersion
|
||||||
pred := opts.Predicate
|
pred := opts.Predicate
|
||||||
if shouldDelegateList(opts) {
|
if shouldDelegateList(opts) {
|
||||||
return c.storage.List(ctx, key, opts, listObj)
|
return c.delegateList(ctx, key, opts, listObj, recursive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If resourceVersion is specified, serve it from cache.
|
// If resourceVersion is specified, serve it from cache.
|
||||||
@ -692,7 +657,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
|
|||||||
if listRV == 0 && !c.ready.check() {
|
if listRV == 0 && !c.ready.check() {
|
||||||
// If Cacher is not yet initialized and we don't require any specific
|
// If Cacher is not yet initialized and we don't require any specific
|
||||||
// minimal resource version, simply forward the request to storage.
|
// minimal resource version, simply forward the request to storage.
|
||||||
return c.storage.List(ctx, key, opts, listObj)
|
return c.delegateList(ctx, key, opts, listObj, recursive)
|
||||||
}
|
}
|
||||||
|
|
||||||
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
|
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
|
||||||
@ -715,7 +680,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
|
|||||||
}
|
}
|
||||||
filter := filterWithAttrsFunction(key, pred)
|
filter := filterWithAttrsFunction(key, pred)
|
||||||
|
|
||||||
objs, readResourceVersion, indexUsed, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
|
objs, readResourceVersion, indexUsed, err := c.listItems(listRV, key, pred, trace, recursive)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user