mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #108042 from MadhavJivrajani/cacher-cleanup
cacher: Minor cleanup and refactor of code and tests
This commit is contained in:
commit
56273a6aa3
@ -509,7 +509,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
||||
// underlying watchCache is calling processEvent under its lock.
|
||||
c.watchCache.RLock()
|
||||
defer c.watchCache.RUnlock()
|
||||
cacheInterval, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
|
||||
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
|
||||
if err != nil {
|
||||
// To match the uncached watch implementation, once we have passed authn/authz/admission,
|
||||
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
|
||||
@ -656,7 +656,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions,
|
||||
return c.delegateList(ctx, key, opts, listObj, recursive)
|
||||
}
|
||||
|
||||
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
|
||||
trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
|
||||
c.ready.wait()
|
||||
@ -680,7 +680,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
|
||||
trace.Step("Listed items from cache", utiltrace.Field{Key: "count", Value: len(objs)})
|
||||
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
|
||||
// Resize the slice appropriately, since we already know that none
|
||||
// of the elements will be filtered out.
|
||||
@ -696,7 +696,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions,
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
|
||||
trace.Step("Filtered items", utiltrace.Field{Key: "count", Value: listVal.Len()})
|
||||
if c.versioner != nil {
|
||||
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
|
||||
return err
|
||||
@ -992,7 +992,7 @@ func (c *Cacher) finishDispatching() {
|
||||
defer c.Unlock()
|
||||
c.dispatching = false
|
||||
for _, watcher := range c.watchersToStop {
|
||||
watcher.stopThreadUnsafe()
|
||||
watcher.stopLocked()
|
||||
}
|
||||
c.watchersToStop = c.watchersToStop[:0]
|
||||
}
|
||||
@ -1000,14 +1000,14 @@ func (c *Cacher) finishDispatching() {
|
||||
func (c *Cacher) terminateAllWatchers() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.watchers.terminateAll(c.objectType, c.stopWatcherThreadUnsafe)
|
||||
c.watchers.terminateAll(c.objectType, c.stopWatcherLocked)
|
||||
}
|
||||
|
||||
func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) {
|
||||
func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) {
|
||||
if c.dispatching {
|
||||
c.watchersToStop = append(c.watchersToStop, watcher)
|
||||
} else {
|
||||
watcher.stopThreadUnsafe()
|
||||
watcher.stopLocked()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1037,9 +1037,9 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
|
||||
defer c.Unlock()
|
||||
|
||||
// It's possible that the watcher is already not in the structure (e.g. in case of
|
||||
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe()
|
||||
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
|
||||
// on a watcher multiple times.
|
||||
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
|
||||
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherLocked)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1196,8 +1196,8 @@ func (c *cacheWatcher) Stop() {
|
||||
c.forget()
|
||||
}
|
||||
|
||||
// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock()
|
||||
func (c *cacheWatcher) stopThreadUnsafe() {
|
||||
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
|
||||
func (c *cacheWatcher) stopLocked() {
|
||||
if !c.stopped {
|
||||
c.stopped = true
|
||||
close(c.done)
|
||||
|
@ -65,7 +65,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||
// forget() has to stop the watcher, as only stopping the watcher
|
||||
// triggers stopping the process() goroutine which we are in the
|
||||
// end waiting for in this test.
|
||||
w.stopThreadUnsafe()
|
||||
w.stopLocked()
|
||||
}
|
||||
initEvents := []*watchCacheEvent{
|
||||
{Object: &v1.Pod{}},
|
||||
@ -210,7 +210,7 @@ TestCase:
|
||||
break TestCase
|
||||
default:
|
||||
}
|
||||
w.stopThreadUnsafe()
|
||||
w.stopLocked()
|
||||
}
|
||||
}
|
||||
|
||||
@ -518,7 +518,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||
forget := func() {
|
||||
w.stopThreadUnsafe()
|
||||
w.stopLocked()
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
||||
@ -541,14 +541,15 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
|
||||
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
||||
ctx, _ := context.WithDeadline(context.Background(), deadline)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), deadline)
|
||||
defer cancel()
|
||||
go w.processInterval(ctx, intervalFromEvents(nil), 0)
|
||||
select {
|
||||
case <-w.ResultChan():
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected received a event on ResultChan")
|
||||
}
|
||||
w.stopThreadUnsafe()
|
||||
w.stopLocked()
|
||||
}
|
||||
}
|
||||
|
||||
@ -661,7 +662,8 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
|
||||
if err != nil {
|
||||
watchErr = fmt.Errorf("Failed to create watch: %v", err)
|
||||
@ -715,7 +717,8 @@ func TestWatchInitializationSignal(t *testing.T) {
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
initSignal := utilflowcontrol.NewInitializationSignal()
|
||||
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)
|
||||
|
||||
@ -740,7 +743,8 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
|
||||
pred := storage.Everything
|
||||
pred.AllowWatchBookmarks = allowWatchBookmarks
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create watch: %v", err)
|
||||
@ -852,7 +856,8 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
|
||||
t.Fatalf("failed to add a pod: %v", err)
|
||||
}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", Predicate: pred})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create watch: %v", err)
|
||||
@ -919,7 +924,8 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
pred := storage.Everything
|
||||
pred.AllowWatchBookmarks = true
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create watch: %v", err)
|
||||
@ -1100,26 +1106,23 @@ func TestStartingResourceVersion(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case e, ok := <-watcher.ResultChan():
|
||||
if !ok {
|
||||
t.Errorf("unexpectedly closed watch")
|
||||
break
|
||||
}
|
||||
object := e.Object
|
||||
if co, ok := object.(runtime.CacheableObject); ok {
|
||||
object = co.GetObject()
|
||||
}
|
||||
pod := object.(*examplev1.Pod)
|
||||
podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
e, ok := <-watcher.ResultChan()
|
||||
if !ok {
|
||||
t.Errorf("unexpectedly closed watch")
|
||||
}
|
||||
object := e.Object
|
||||
if co, ok := object.(runtime.CacheableObject); ok {
|
||||
object = co.GetObject()
|
||||
}
|
||||
pod := object.(*examplev1.Pod)
|
||||
podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// event should have at least rv + 1, since we're starting the watch at rv
|
||||
if podRV <= startVersion {
|
||||
t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV)
|
||||
}
|
||||
// event should have at least rv + 1, since we're starting the watch at rv
|
||||
if podRV <= startVersion {
|
||||
t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -573,7 +573,21 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
|
||||
w.onReplace = onReplace
|
||||
}
|
||||
|
||||
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*watchCacheInterval, error) {
|
||||
func (w *watchCache) Resync() error {
|
||||
// Nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
// isIndexValidLocked checks if a given index is still valid.
|
||||
// This assumes that the lock is held.
|
||||
func (w *watchCache) isIndexValidLocked(index int) bool {
|
||||
return index >= w.startIndex
|
||||
}
|
||||
|
||||
// getAllEventsSinceLocked returns a watchCacheInterval that can be used to
|
||||
// retrieve events since a certain resourceVersion. This function assumes to
|
||||
// be called under the watchCache lock.
|
||||
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) {
|
||||
size := w.endIndex - w.startIndex
|
||||
var oldest uint64
|
||||
switch {
|
||||
@ -620,14 +634,3 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*wat
|
||||
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
|
||||
return ci, nil
|
||||
}
|
||||
|
||||
func (w *watchCache) Resync() error {
|
||||
// Nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
// isIndexValidLocked checks if a given index is still valid.
|
||||
// This assumes that the lock is held.
|
||||
func (w *watchCache) isIndexValidLocked(index int) bool {
|
||||
return index >= w.startIndex
|
||||
}
|
||||
|
@ -35,12 +35,12 @@ import (
|
||||
// for starting a watch and reduce the maximum possible time
|
||||
// interval for which the lock would be held while events are
|
||||
// copied over.
|
||||
|
||||
//
|
||||
// The source of events for the interval is typically either
|
||||
// the watchCache circular buffer, if events being retrieved
|
||||
// need to be for resource versions > 0 or the underlying
|
||||
// implementation of Store, if resource version = 0.
|
||||
|
||||
//
|
||||
// Furthermore, an interval can be either valid or invalid at
|
||||
// any given point of time. The notion of validity makes sense
|
||||
// only in cases where the window of events in the underlying
|
||||
|
@ -96,7 +96,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*wat
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
|
||||
return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
|
||||
return w.getAllEventsSinceLocked(resourceVersion)
|
||||
}
|
||||
|
||||
// newTestWatchCache just adds a fake clock.
|
||||
|
Loading…
Reference in New Issue
Block a user