diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 90c5fbdafb4..dfd620a3ca1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -462,7 +462,9 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return nil, err } - c.ready.wait() + if err := c.ready.wait(); err != nil { + return nil, errors.NewServiceUnavailable(err.Error()) + } triggerValue, triggerSupported := "", false if c.indexedTrigger != nil { @@ -559,7 +561,9 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o // Do not create a trace - it's not for free and there are tons // of Get requests. We can add it if it will be really needed. - c.ready.wait() + if err := c.ready.wait(); err != nil { + return errors.NewServiceUnavailable(err.Error()) + } objVal, err := conversion.EnforcePtr(objPtr) if err != nil { @@ -644,7 +648,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()}) defer trace.LogIfLong(500 * time.Millisecond) - c.ready.wait() + if err := c.ready.wait(); err != nil { + return errors.NewServiceUnavailable(err.Error()) + } trace.Step("Ready") // List elements with at least 'listRV' from cache. @@ -1011,6 +1017,7 @@ func (c *Cacher) Stop() { return } c.stopped = true + c.ready.stop() c.stopLock.Unlock() close(c.stopCh) c.stopWg.Wait() @@ -1040,7 +1047,9 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit // LastSyncResourceVersion returns resource version to which the underlying cache is synced. func (c *Cacher) LastSyncResourceVersion() (uint64, error) { - c.ready.wait() + if err := c.ready.wait(); err != nil { + return 0, errors.NewServiceUnavailable(err.Error()) + } resourceVersion := c.reflector.LastSyncResourceVersion() return c.versioner.ParseResourceVersion(resourceVersion) @@ -1426,36 +1435,3 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { } } } - -type ready struct { - ok bool - c *sync.Cond -} - -func newReady() *ready { - return &ready{c: sync.NewCond(&sync.RWMutex{})} -} - -func (r *ready) wait() { - r.c.L.Lock() - for !r.ok { - r.c.Wait() - } - r.c.L.Unlock() -} - -// TODO: Make check() function more sophisticated, in particular -// allow it to behave as "waitWithTimeout". -func (r *ready) check() bool { - rwMutex := r.c.L.(*sync.RWMutex) - rwMutex.RLock() - defer rwMutex.RUnlock() - return r.ok -} - -func (r *ready) set(ok bool) { - r.c.L.Lock() - defer r.c.L.Unlock() - r.ok = ok - r.c.Broadcast() -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index b71260102e2..5f0ba257a6b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -337,7 +337,9 @@ func TestGetListCacheBypass(t *testing.T) { result := &example.PodList{} // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy @@ -374,7 +376,9 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { result := &example.PodList{} // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy @@ -406,7 +410,9 @@ func TestGetCacheBypass(t *testing.T) { result := &example.Pod{} // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy @@ -436,7 +442,9 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is some budget for slowing down processing. cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) @@ -561,7 +569,9 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { @@ -591,6 +601,68 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) { } +func TestCacheDontAcceptRequestsStopped(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + + // Wait until cacher is initialized. + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + watchClosed := make(chan struct{}) + go func() { + defer close(watchClosed) + for event := range w.ResultChan() { + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + // ok + default: + t.Errorf("unexpected event %#v", event) + } + } + }() + + cacher.Stop() + + _, err = cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err == nil { + t.Fatalf("Success to create Watch: %v", err) + } + + result := &example.Pod{} + err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + IgnoreNotFound: true, + ResourceVersion: "1", + }, result) + if err == nil { + t.Fatalf("Success to create Get: %v", err) + } + + err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ + ResourceVersion: "1", + Recursive: true, + }, result) + if err == nil { + t.Fatalf("Success to create GetList: %v", err) + } + + select { + case <-watchClosed: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for watch to close") + } + +} + func TestTimeBucketWatchersBasic(t *testing.T) { filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true @@ -642,7 +714,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } pred := storage.Everything pred.AllowWatchBookmarks = true @@ -738,7 +812,9 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } pred := storage.Everything pred.AllowWatchBookmarks = allowWatchBookmarks @@ -836,7 +912,9 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { cacher.bookmarkWatchers.bookmarkFrequency = time.Second // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } pred := storage.Everything pred.AllowWatchBookmarks = true @@ -904,7 +982,9 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is some budget for slowing down processing. cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) @@ -980,7 +1060,9 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) { cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second) // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } makePod := func(i int) *examplev1.Pod { return &examplev1.Pod{ @@ -1056,7 +1138,9 @@ func TestStartingResourceVersion(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is some budget for slowing down processing. // We use the fakeTimeBudget to prevent this test from flaking under @@ -1134,7 +1218,9 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is some budget for slowing down processing. // We use the fakeTimeBudget to prevent this test from flaking under @@ -1243,7 +1329,9 @@ func TestCachingDeleteEvents(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } fooPredicate := storage.SelectionPredicate{ Label: labels.SelectorFromSet(map[string]string{"foo": "true"}), @@ -1323,7 +1411,9 @@ func testCachingObjects(t *testing.T, watchersCount int) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } dispatchedEvents := []*watchCacheEvent{} cacher.watchCache.eventHandler = func(event *watchCacheEvent) { @@ -1417,7 +1507,9 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is enough budget for slow processing since // the entire watch cache is going to be served through the // interval and events won't be popped from the cacheWatcher's diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go new file mode 100644 index 00000000000..8278dd2b2f6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go @@ -0,0 +1,96 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "fmt" + "sync" +) + +type status int + +const ( + Pending status = iota + Ready + Stopped +) + +// ready is a three state condition variable that blocks until is Ready if is not Stopped. +// Its initial state is Pending. +type ready struct { + state status + c *sync.Cond +} + +func newReady() *ready { + return &ready{ + c: sync.NewCond(&sync.RWMutex{}), + state: Pending, + } +} + +// wait blocks until it is Ready or Stopped, it returns an error if is Stopped. +func (r *ready) wait() error { + r.c.L.Lock() + defer r.c.L.Unlock() + for r.state == Pending { + r.c.Wait() + } + switch r.state { + case Ready: + return nil + case Stopped: + return fmt.Errorf("apiserver cacher is stopped") + default: + return fmt.Errorf("unexpected apiserver cache state: %v", r.state) + } +} + +// check returns true only if it is Ready. +func (r *ready) check() bool { + // TODO: Make check() function more sophisticated, in particular + // allow it to behave as "waitWithTimeout". + rwMutex := r.c.L.(*sync.RWMutex) + rwMutex.RLock() + defer rwMutex.RUnlock() + return r.state == Ready +} + +// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped. +func (r *ready) set(ok bool) { + r.c.L.Lock() + defer r.c.L.Unlock() + if r.state == Stopped { + return + } + if ok { + r.state = Ready + } else { + r.state = Pending + } + r.c.Broadcast() +} + +// stop the condition variable and set it as Stopped. This state is irreversible. +func (r *ready) stop() { + r.c.L.Lock() + defer r.c.L.Unlock() + if r.state != Stopped { + r.state = Stopped + r.c.Broadcast() + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go new file mode 100644 index 00000000000..c14a7068a1c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go @@ -0,0 +1,83 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "testing" +) + +func Test_newReady(t *testing.T) { + errCh := make(chan error, 10) + ready := newReady() + ready.set(false) + // create 10 goroutines waiting for ready + for i := 0; i < 10; i++ { + go func() { + errCh <- ready.wait() + }() + } + ready.set(true) + for i := 0; i < 10; i++ { + if err := <-errCh; err != nil { + t.Errorf("unexpected error on channel %d", i) + } + } +} + +func Test_newReadyStop(t *testing.T) { + errCh := make(chan error, 10) + ready := newReady() + ready.set(false) + // create 10 goroutines waiting for ready and stop + for i := 0; i < 10; i++ { + go func() { + errCh <- ready.wait() + }() + } + ready.stop() + for i := 0; i < 10; i++ { + if err := <-errCh; err == nil { + t.Errorf("unexpected success on channel %d", i) + } + } +} + +func Test_newReadyCheck(t *testing.T) { + ready := newReady() + // it starts as false + if ready.check() { + t.Errorf("unexpected ready state %v", ready.check()) + } + ready.set(true) + if !ready.check() { + t.Errorf("unexpected ready state %v", ready.check()) + } + // stop sets ready to false + ready.stop() + if ready.check() { + t.Errorf("unexpected ready state %v", ready.check()) + } + // can not set to true if is stopped + ready.set(true) + if ready.check() { + t.Errorf("unexpected ready state %v", ready.check()) + } + err := ready.wait() + if err == nil { + t.Errorf("expected error waiting on a stopped state") + } +}