mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #108414 from aojea/cacher_context
cacher: don't accept requests if stopped
This commit is contained in:
commit
999b1bbe92
@ -462,7 +462,9 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.ready.wait()
|
if err := c.ready.wait(); err != nil {
|
||||||
|
return nil, errors.NewServiceUnavailable(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
triggerValue, triggerSupported := "", false
|
triggerValue, triggerSupported := "", false
|
||||||
if c.indexedTrigger != nil {
|
if c.indexedTrigger != nil {
|
||||||
@ -559,7 +561,9 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
|
|||||||
|
|
||||||
// Do not create a trace - it's not for free and there are tons
|
// Do not create a trace - it's not for free and there are tons
|
||||||
// of Get requests. We can add it if it will be really needed.
|
// of Get requests. We can add it if it will be really needed.
|
||||||
c.ready.wait()
|
if err := c.ready.wait(); err != nil {
|
||||||
|
return errors.NewServiceUnavailable(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
objVal, err := conversion.EnforcePtr(objPtr)
|
objVal, err := conversion.EnforcePtr(objPtr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -644,7 +648,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()})
|
trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()})
|
||||||
defer trace.LogIfLong(500 * time.Millisecond)
|
defer trace.LogIfLong(500 * time.Millisecond)
|
||||||
|
|
||||||
c.ready.wait()
|
if err := c.ready.wait(); err != nil {
|
||||||
|
return errors.NewServiceUnavailable(err.Error())
|
||||||
|
}
|
||||||
trace.Step("Ready")
|
trace.Step("Ready")
|
||||||
|
|
||||||
// List elements with at least 'listRV' from cache.
|
// List elements with at least 'listRV' from cache.
|
||||||
@ -1011,6 +1017,7 @@ func (c *Cacher) Stop() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.stopped = true
|
c.stopped = true
|
||||||
|
c.ready.stop()
|
||||||
c.stopLock.Unlock()
|
c.stopLock.Unlock()
|
||||||
close(c.stopCh)
|
close(c.stopCh)
|
||||||
c.stopWg.Wait()
|
c.stopWg.Wait()
|
||||||
@ -1040,7 +1047,9 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit
|
|||||||
|
|
||||||
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
|
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
|
||||||
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
||||||
c.ready.wait()
|
if err := c.ready.wait(); err != nil {
|
||||||
|
return 0, errors.NewServiceUnavailable(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
resourceVersion := c.reflector.LastSyncResourceVersion()
|
resourceVersion := c.reflector.LastSyncResourceVersion()
|
||||||
return c.versioner.ParseResourceVersion(resourceVersion)
|
return c.versioner.ParseResourceVersion(resourceVersion)
|
||||||
@ -1426,36 +1435,3 @@ func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ready struct {
|
|
||||||
ok bool
|
|
||||||
c *sync.Cond
|
|
||||||
}
|
|
||||||
|
|
||||||
func newReady() *ready {
|
|
||||||
return &ready{c: sync.NewCond(&sync.RWMutex{})}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ready) wait() {
|
|
||||||
r.c.L.Lock()
|
|
||||||
for !r.ok {
|
|
||||||
r.c.Wait()
|
|
||||||
}
|
|
||||||
r.c.L.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Make check() function more sophisticated, in particular
|
|
||||||
// allow it to behave as "waitWithTimeout".
|
|
||||||
func (r *ready) check() bool {
|
|
||||||
rwMutex := r.c.L.(*sync.RWMutex)
|
|
||||||
rwMutex.RLock()
|
|
||||||
defer rwMutex.RUnlock()
|
|
||||||
return r.ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *ready) set(ok bool) {
|
|
||||||
r.c.L.Lock()
|
|
||||||
defer r.c.L.Unlock()
|
|
||||||
r.ok = ok
|
|
||||||
r.c.Broadcast()
|
|
||||||
}
|
|
||||||
|
@ -337,7 +337,9 @@ func TestGetListCacheBypass(t *testing.T) {
|
|||||||
result := &example.PodList{}
|
result := &example.PodList{}
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.err = errDummy
|
backingStorage.err = errDummy
|
||||||
@ -374,7 +376,9 @@ func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
|||||||
result := &example.PodList{}
|
result := &example.PodList{}
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.err = errDummy
|
backingStorage.err = errDummy
|
||||||
@ -406,7 +410,9 @@ func TestGetCacheBypass(t *testing.T) {
|
|||||||
result := &example.Pod{}
|
result := &example.Pod{}
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.err = errDummy
|
backingStorage.err = errDummy
|
||||||
@ -436,7 +442,9 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
|
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
|
||||||
@ -561,7 +569,9 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -591,6 +601,68 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacheDontAcceptRequestsStopped(t *testing.T) {
|
||||||
|
backingStorage := &dummyStorage{}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until cacher is initialized.
|
||||||
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create watch: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
watchClosed := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(watchClosed)
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added, watch.Modified, watch.Deleted:
|
||||||
|
// ok
|
||||||
|
default:
|
||||||
|
t.Errorf("unexpected event %#v", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
cacher.Stop()
|
||||||
|
|
||||||
|
_, err = cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Success to create Watch: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &example.Pod{}
|
||||||
|
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
|
||||||
|
IgnoreNotFound: true,
|
||||||
|
ResourceVersion: "1",
|
||||||
|
}, result)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Success to create Get: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{
|
||||||
|
ResourceVersion: "1",
|
||||||
|
Recursive: true,
|
||||||
|
}, result)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Success to create GetList: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-watchClosed:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Errorf("timed out waiting for watch to close")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func TestTimeBucketWatchersBasic(t *testing.T) {
|
func TestTimeBucketWatchersBasic(t *testing.T) {
|
||||||
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
|
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
|
||||||
return true
|
return true
|
||||||
@ -642,7 +714,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
pred := storage.Everything
|
pred := storage.Everything
|
||||||
pred.AllowWatchBookmarks = true
|
pred.AllowWatchBookmarks = true
|
||||||
|
|
||||||
@ -738,7 +812,9 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
pred := storage.Everything
|
pred := storage.Everything
|
||||||
pred.AllowWatchBookmarks = allowWatchBookmarks
|
pred.AllowWatchBookmarks = allowWatchBookmarks
|
||||||
|
|
||||||
@ -836,7 +912,9 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
|
|||||||
cacher.bookmarkWatchers.bookmarkFrequency = time.Second
|
cacher.bookmarkWatchers.bookmarkFrequency = time.Second
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
pred := storage.Everything
|
pred := storage.Everything
|
||||||
pred.AllowWatchBookmarks = true
|
pred.AllowWatchBookmarks = true
|
||||||
|
|
||||||
@ -904,7 +982,9 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
|
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
|
||||||
@ -980,7 +1060,9 @@ func TestBookmarksOnResourceVersionUpdates(t *testing.T) {
|
|||||||
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
|
cacher.bookmarkWatchers = newTimeBucketWatchers(clock.RealClock{}, 2*time.Second)
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
makePod := func(i int) *examplev1.Pod {
|
makePod := func(i int) *examplev1.Pod {
|
||||||
return &examplev1.Pod{
|
return &examplev1.Pod{
|
||||||
@ -1056,7 +1138,9 @@ func TestStartingResourceVersion(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
// We use the fakeTimeBudget to prevent this test from flaking under
|
// We use the fakeTimeBudget to prevent this test from flaking under
|
||||||
@ -1134,7 +1218,9 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure there is some budget for slowing down processing.
|
// Ensure there is some budget for slowing down processing.
|
||||||
// We use the fakeTimeBudget to prevent this test from flaking under
|
// We use the fakeTimeBudget to prevent this test from flaking under
|
||||||
@ -1243,7 +1329,9 @@ func TestCachingDeleteEvents(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
fooPredicate := storage.SelectionPredicate{
|
fooPredicate := storage.SelectionPredicate{
|
||||||
Label: labels.SelectorFromSet(map[string]string{"foo": "true"}),
|
Label: labels.SelectorFromSet(map[string]string{"foo": "true"}),
|
||||||
@ -1323,7 +1411,9 @@ func testCachingObjects(t *testing.T, watchersCount int) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
|
|
||||||
dispatchedEvents := []*watchCacheEvent{}
|
dispatchedEvents := []*watchCacheEvent{}
|
||||||
cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
|
cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
|
||||||
@ -1417,7 +1507,9 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
|
|||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
cacher.ready.wait()
|
if err := cacher.ready.wait(); err != nil {
|
||||||
|
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||||
|
}
|
||||||
// Ensure there is enough budget for slow processing since
|
// Ensure there is enough budget for slow processing since
|
||||||
// the entire watch cache is going to be served through the
|
// the entire watch cache is going to be served through the
|
||||||
// interval and events won't be popped from the cacheWatcher's
|
// interval and events won't be popped from the cacheWatcher's
|
||||||
|
96
staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go
Normal file
96
staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2022 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cacher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type status int
|
||||||
|
|
||||||
|
const (
|
||||||
|
Pending status = iota
|
||||||
|
Ready
|
||||||
|
Stopped
|
||||||
|
)
|
||||||
|
|
||||||
|
// ready is a three state condition variable that blocks until is Ready if is not Stopped.
|
||||||
|
// Its initial state is Pending.
|
||||||
|
type ready struct {
|
||||||
|
state status
|
||||||
|
c *sync.Cond
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReady() *ready {
|
||||||
|
return &ready{
|
||||||
|
c: sync.NewCond(&sync.RWMutex{}),
|
||||||
|
state: Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
|
||||||
|
func (r *ready) wait() error {
|
||||||
|
r.c.L.Lock()
|
||||||
|
defer r.c.L.Unlock()
|
||||||
|
for r.state == Pending {
|
||||||
|
r.c.Wait()
|
||||||
|
}
|
||||||
|
switch r.state {
|
||||||
|
case Ready:
|
||||||
|
return nil
|
||||||
|
case Stopped:
|
||||||
|
return fmt.Errorf("apiserver cacher is stopped")
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check returns true only if it is Ready.
|
||||||
|
func (r *ready) check() bool {
|
||||||
|
// TODO: Make check() function more sophisticated, in particular
|
||||||
|
// allow it to behave as "waitWithTimeout".
|
||||||
|
rwMutex := r.c.L.(*sync.RWMutex)
|
||||||
|
rwMutex.RLock()
|
||||||
|
defer rwMutex.RUnlock()
|
||||||
|
return r.state == Ready
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
|
||||||
|
func (r *ready) set(ok bool) {
|
||||||
|
r.c.L.Lock()
|
||||||
|
defer r.c.L.Unlock()
|
||||||
|
if r.state == Stopped {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
r.state = Ready
|
||||||
|
} else {
|
||||||
|
r.state = Pending
|
||||||
|
}
|
||||||
|
r.c.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop the condition variable and set it as Stopped. This state is irreversible.
|
||||||
|
func (r *ready) stop() {
|
||||||
|
r.c.L.Lock()
|
||||||
|
defer r.c.L.Unlock()
|
||||||
|
if r.state != Stopped {
|
||||||
|
r.state = Stopped
|
||||||
|
r.c.Broadcast()
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2022 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cacher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_newReady(t *testing.T) {
|
||||||
|
errCh := make(chan error, 10)
|
||||||
|
ready := newReady()
|
||||||
|
ready.set(false)
|
||||||
|
// create 10 goroutines waiting for ready
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
go func() {
|
||||||
|
errCh <- ready.wait()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
ready.set(true)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if err := <-errCh; err != nil {
|
||||||
|
t.Errorf("unexpected error on channel %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_newReadyStop(t *testing.T) {
|
||||||
|
errCh := make(chan error, 10)
|
||||||
|
ready := newReady()
|
||||||
|
ready.set(false)
|
||||||
|
// create 10 goroutines waiting for ready and stop
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
go func() {
|
||||||
|
errCh <- ready.wait()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
ready.stop()
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if err := <-errCh; err == nil {
|
||||||
|
t.Errorf("unexpected success on channel %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_newReadyCheck(t *testing.T) {
|
||||||
|
ready := newReady()
|
||||||
|
// it starts as false
|
||||||
|
if ready.check() {
|
||||||
|
t.Errorf("unexpected ready state %v", ready.check())
|
||||||
|
}
|
||||||
|
ready.set(true)
|
||||||
|
if !ready.check() {
|
||||||
|
t.Errorf("unexpected ready state %v", ready.check())
|
||||||
|
}
|
||||||
|
// stop sets ready to false
|
||||||
|
ready.stop()
|
||||||
|
if ready.check() {
|
||||||
|
t.Errorf("unexpected ready state %v", ready.check())
|
||||||
|
}
|
||||||
|
// can not set to true if is stopped
|
||||||
|
ready.set(true)
|
||||||
|
if ready.check() {
|
||||||
|
t.Errorf("unexpected ready state %v", ready.check())
|
||||||
|
}
|
||||||
|
err := ready.wait()
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("expected error waiting on a stopped state")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user