From 2cb3a56e83ae33464edb174b1b6373ba50600759 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 3 Mar 2022 12:01:59 +0100 Subject: [PATCH] apiserver cacher: don't accept requests if stopped The cacher blocks requests until it is ready, however, the ready variable doesn't differentiate if the cacher was stopped. The cacher is using a condition variable based on sync.Cond to handle the readiness, however, this was not taking into account if it was not ready because it was waiting to be ready or it was stopped. Add a new condition to the condition variable to handle the stop condition, and returning an error to signal the goroutines that they should stop waiting and bail out. --- .../apiserver/pkg/storage/cacher/cacher.go | 50 ++----- .../storage/cacher/cacher_whitebox_test.go | 122 +++++++++++++++--- .../apiserver/pkg/storage/cacher/ready.go | 96 ++++++++++++++ .../pkg/storage/cacher/ready_test.go | 83 ++++++++++++ 4 files changed, 299 insertions(+), 52 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 84940ab0cd7..ad7b99b5951 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -462,7 +462,9 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return nil, err } - c.ready.wait() + if err := c.ready.wait(); err != nil { + return nil, errors.NewServiceUnavailable(err.Error()) + } triggerValue, triggerSupported := "", false if c.indexedTrigger != nil { @@ -559,7 +561,9 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o // Do not create a trace - it's not for free and there are tons // of Get requests. We can add it if it will be really needed. - c.ready.wait() + if err := c.ready.wait(); err != nil { + return errors.NewServiceUnavailable(err.Error()) + } objVal, err := conversion.EnforcePtr(objPtr) if err != nil { @@ -644,7 +648,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()}) defer trace.LogIfLong(500 * time.Millisecond) - c.ready.wait() + if err := c.ready.wait(); err != nil { + return errors.NewServiceUnavailable(err.Error()) + } trace.Step("Ready") // List elements with at least 'listRV' from cache. @@ -1011,6 +1017,7 @@ func (c *Cacher) Stop() { return } c.stopped = true + c.ready.stop() c.stopLock.Unlock() close(c.stopCh) c.stopWg.Wait() @@ -1040,7 +1047,9 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit // LastSyncResourceVersion returns resource version to which the underlying cache is synced. func (c *Cacher) LastSyncResourceVersion() (uint64, error) { - c.ready.wait() + if err := c.ready.wait(); err != nil { + return 0, errors.NewServiceUnavailable(err.Error()) + } resourceVersion := c.reflector.LastSyncResourceVersion() return c.versioner.ParseResourceVersion(resourceVersion) @@ -1426,36 +1435,3 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { } } } - -type ready struct { - ok bool - c *sync.Cond -} - -func newReady() *ready { - return &ready{c: sync.NewCond(&sync.RWMutex{})} -} - -func (r *ready) wait() { - r.c.L.Lock() - for !r.ok { - r.c.Wait() - } - r.c.L.Unlock() -} - -// TODO: Make check() function more sophisticated, in particular -// allow it to behave as "waitWithTimeout". -func (r *ready) check() bool { - rwMutex := r.c.L.(*sync.RWMutex) - rwMutex.RLock() - defer rwMutex.RUnlock() - return r.ok -} - -func (r *ready) set(ok bool) { - r.c.L.Lock() - defer r.c.L.Unlock() - r.ok = ok - r.c.Broadcast() -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index b71260102e2..5f0ba257a6b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -337,7 +337,9 @@ func TestGetListCacheBypass(t *testing.T) { result := &example.PodList{} // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy @@ -374,7 +376,9 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) { result := &example.PodList{} // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy @@ -406,7 +410,9 @@ func TestGetCacheBypass(t *testing.T) { result := &example.Pod{} // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy @@ -436,7 +442,9 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is some budget for slowing down processing. cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) @@ -561,7 +569,9 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { @@ -591,6 +601,68 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) { } +func TestCacheDontAcceptRequestsStopped(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + + // Wait until cacher is initialized. + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + watchClosed := make(chan struct{}) + go func() { + defer close(watchClosed) + for event := range w.ResultChan() { + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + // ok + default: + t.Errorf("unexpected event %#v", event) + } + } + }() + + cacher.Stop() + + _, err = cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err == nil { + t.Fatalf("Success to create Watch: %v", err) + } + + result := &example.Pod{} + err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + IgnoreNotFound: true, + ResourceVersion: "1", + }, result) + if err == nil { + t.Fatalf("Success to create Get: %v", err) + } + + err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ + ResourceVersion: "1", + Recursive: true, + }, result) + if err == nil { + t.Fatalf("Success to create GetList: %v", err) + } + + select { + case <-watchClosed: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for watch to close") + } + +} + func TestTimeBucketWatchersBasic(t *testing.T) { filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true @@ -642,7 +714,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } pred := storage.Everything pred.AllowWatchBookmarks = true @@ -738,7 +812,9 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } pred := storage.Everything pred.AllowWatchBookmarks = allowWatchBookmarks @@ -836,7 +912,9 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { cacher.bookmarkWatchers.bookmarkFrequency = time.Second // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } pred := storage.Everything pred.AllowWatchBookmarks = true @@ -904,7 +982,9 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is some budget for slowing down processing. cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) @@ -980,7 +1060,9 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) { cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second) // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } makePod := func(i int) *examplev1.Pod { return &examplev1.Pod{ @@ -1056,7 +1138,9 @@ func TestStartingResourceVersion(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is some budget for slowing down processing. // We use the fakeTimeBudget to prevent this test from flaking under @@ -1134,7 +1218,9 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is some budget for slowing down processing. // We use the fakeTimeBudget to prevent this test from flaking under @@ -1243,7 +1329,9 @@ func TestCachingDeleteEvents(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } fooPredicate := storage.SelectionPredicate{ Label: labels.SelectorFromSet(map[string]string{"foo": "true"}), @@ -1323,7 +1411,9 @@ func testCachingObjects(t *testing.T, watchersCount int) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } dispatchedEvents := []*watchCacheEvent{} cacher.watchCache.eventHandler = func(event *watchCacheEvent) { @@ -1417,7 +1507,9 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { defer cacher.Stop() // Wait until cacher is initialized. - cacher.ready.wait() + if err := cacher.ready.wait(); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } // Ensure there is enough budget for slow processing since // the entire watch cache is going to be served through the // interval and events won't be popped from the cacheWatcher's diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go new file mode 100644 index 00000000000..8278dd2b2f6 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go @@ -0,0 +1,96 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "fmt" + "sync" +) + +type status int + +const ( + Pending status = iota + Ready + Stopped +) + +// ready is a three state condition variable that blocks until is Ready if is not Stopped. +// Its initial state is Pending. +type ready struct { + state status + c *sync.Cond +} + +func newReady() *ready { + return &ready{ + c: sync.NewCond(&sync.RWMutex{}), + state: Pending, + } +} + +// wait blocks until it is Ready or Stopped, it returns an error if is Stopped. +func (r *ready) wait() error { + r.c.L.Lock() + defer r.c.L.Unlock() + for r.state == Pending { + r.c.Wait() + } + switch r.state { + case Ready: + return nil + case Stopped: + return fmt.Errorf("apiserver cacher is stopped") + default: + return fmt.Errorf("unexpected apiserver cache state: %v", r.state) + } +} + +// check returns true only if it is Ready. +func (r *ready) check() bool { + // TODO: Make check() function more sophisticated, in particular + // allow it to behave as "waitWithTimeout". + rwMutex := r.c.L.(*sync.RWMutex) + rwMutex.RLock() + defer rwMutex.RUnlock() + return r.state == Ready +} + +// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped. +func (r *ready) set(ok bool) { + r.c.L.Lock() + defer r.c.L.Unlock() + if r.state == Stopped { + return + } + if ok { + r.state = Ready + } else { + r.state = Pending + } + r.c.Broadcast() +} + +// stop the condition variable and set it as Stopped. This state is irreversible. +func (r *ready) stop() { + r.c.L.Lock() + defer r.c.L.Unlock() + if r.state != Stopped { + r.state = Stopped + r.c.Broadcast() + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go new file mode 100644 index 00000000000..c14a7068a1c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go @@ -0,0 +1,83 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "testing" +) + +func Test_newReady(t *testing.T) { + errCh := make(chan error, 10) + ready := newReady() + ready.set(false) + // create 10 goroutines waiting for ready + for i := 0; i < 10; i++ { + go func() { + errCh <- ready.wait() + }() + } + ready.set(true) + for i := 0; i < 10; i++ { + if err := <-errCh; err != nil { + t.Errorf("unexpected error on channel %d", i) + } + } +} + +func Test_newReadyStop(t *testing.T) { + errCh := make(chan error, 10) + ready := newReady() + ready.set(false) + // create 10 goroutines waiting for ready and stop + for i := 0; i < 10; i++ { + go func() { + errCh <- ready.wait() + }() + } + ready.stop() + for i := 0; i < 10; i++ { + if err := <-errCh; err == nil { + t.Errorf("unexpected success on channel %d", i) + } + } +} + +func Test_newReadyCheck(t *testing.T) { + ready := newReady() + // it starts as false + if ready.check() { + t.Errorf("unexpected ready state %v", ready.check()) + } + ready.set(true) + if !ready.check() { + t.Errorf("unexpected ready state %v", ready.check()) + } + // stop sets ready to false + ready.stop() + if ready.check() { + t.Errorf("unexpected ready state %v", ready.check()) + } + // can not set to true if is stopped + ready.set(true) + if ready.check() { + t.Errorf("unexpected ready state %v", ready.check()) + } + err := ready.wait() + if err == nil { + t.Errorf("expected error waiting on a stopped state") + } +}