Merge pull request #124692 from linxiulei/watchlist_opt

cacher: apply key for initial events
This commit is contained in:
Kubernetes Prow Robot 2024-06-04 04:22:20 -07:00 committed by GitHub
commit e6d641651a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 27 additions and 13 deletions

View File

@ -300,7 +300,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
store.Add(elem) store.Add(elem)
} }
wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc) wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc, "", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -622,7 +622,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
defer c.watchCache.RUnlock() defer c.watchCache.RUnlock()
var cacheInterval *watchCacheInterval var cacheInterval *watchCacheInterval
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, opts) cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, key, opts)
if err != nil { if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission, // To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,

View File

@ -712,9 +712,10 @@ func (w *watchCache) isIndexValidLocked(index int) bool {
// getAllEventsSinceLocked returns a watchCacheInterval that can be used to // getAllEventsSinceLocked returns a watchCacheInterval that can be used to
// retrieve events since a certain resourceVersion. This function assumes to // retrieve events since a certain resourceVersion. This function assumes to
// be called under the watchCache lock. // be called under the watchCache lock.
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) { func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string, opts storage.ListOptions) (*watchCacheInterval, error) {
_, matchesSingle := opts.Predicate.MatchesSingle()
if opts.SendInitialEvents != nil && *opts.SendInitialEvents { if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
return w.getIntervalFromStoreLocked() return w.getIntervalFromStoreLocked(key, matchesSingle)
} }
size := w.endIndex - w.startIndex size := w.endIndex - w.startIndex
@ -743,7 +744,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storag
// current state and only then start watching from that point. // current state and only then start watching from that point.
// //
// TODO: In v2 api, we should stop returning the current state - #13969. // TODO: In v2 api, we should stop returning the current state - #13969.
return w.getIntervalFromStoreLocked() return w.getIntervalFromStoreLocked(key, matchesSingle)
} }
// SendInitialEvents = false and resourceVersion = 0 // SendInitialEvents = false and resourceVersion = 0
// means that the request would like to start watching // means that the request would like to start watching
@ -769,8 +770,8 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storag
// getIntervalFromStoreLocked returns a watchCacheInterval // getIntervalFromStoreLocked returns a watchCacheInterval
// that covers the entire storage state. // that covers the entire storage state.
// This function assumes to be called under the watchCache lock. // This function assumes to be called under the watchCache lock.
func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) { func (w *watchCache) getIntervalFromStoreLocked(key string, matchesSingle bool) (*watchCacheInterval, error) {
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc) ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc, key, matchesSingle)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -133,9 +133,22 @@ func (s sortableWatchCacheEvents) Swap(i, j int) {
// returned by Next() need to be events from a List() done on the underlying store of // returned by Next() need to be events from a List() done on the underlying store of
// the watch cache. // the watch cache.
// The items returned in the interval will be sorted by Key. // The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) { func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{} buffer := &watchCacheIntervalBuffer{}
allItems := store.List() var allItems []interface{}
if matchesSingle {
item, exists, err := store.GetByKey(key)
if err != nil {
return nil, err
}
if exists {
allItems = append(allItems, item)
}
} else {
allItems = store.List()
}
buffer.buffer = make([]*watchCacheEvent, len(allItems)) buffer.buffer = make([]*watchCacheEvent, len(allItems))
for i, item := range allItems { for i, item := range allItems {
elem, ok := item.(*storeElement) elem, ok := item.(*storeElement)

View File

@ -391,7 +391,7 @@ func TestCacheIntervalNextFromStore(t *testing.T) {
store.Add(elem) store.Add(elem)
} }
wci, err := newCacheIntervalFromStore(rv, store, getAttrsFunc) wci, err := newCacheIntervalFromStore(rv, store, getAttrsFunc, "", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -105,7 +105,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts
w.RLock() w.RLock()
defer w.RUnlock() defer w.RUnlock()
return w.getAllEventsSinceLocked(resourceVersion, opts) return w.getAllEventsSinceLocked(resourceVersion, "", opts)
} }
// newTestWatchCache just adds a fake clock. // newTestWatchCache just adds a fake clock.

View File

@ -118,7 +118,7 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set)
// MatchesSingleNamespace will return (namespace, true) if and only if s.Field matches on the object's // MatchesSingleNamespace will return (namespace, true) if and only if s.Field matches on the object's
// namespace. // namespace.
func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) { func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
if len(s.Continue) > 0 { if len(s.Continue) > 0 || s.Field == nil {
return "", false return "", false
} }
if namespace, ok := s.Field.RequiresExactMatch("metadata.namespace"); ok { if namespace, ok := s.Field.RequiresExactMatch("metadata.namespace"); ok {
@ -130,7 +130,7 @@ func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's // MatchesSingle will return (name, true) if and only if s.Field matches on the object's
// name. // name.
func (s *SelectionPredicate) MatchesSingle() (string, bool) { func (s *SelectionPredicate) MatchesSingle() (string, bool) {
if len(s.Continue) > 0 { if len(s.Continue) > 0 || s.Field == nil {
return "", false return "", false
} }
// TODO: should be namespace.name // TODO: should be namespace.name