diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 8620c9fcb73..eada35b1d0a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -524,7 +524,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return nil, err } - if err := c.ready.wait(ctx); err != nil { + readyGeneration, err := c.ready.waitAndReadGeneration(ctx) + if err != nil { return nil, errors.NewServiceUnavailable(err.Error()) } @@ -616,14 +617,24 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return newErrWatcher(err), nil } + addedWatcher := false func() { c.Lock() defer c.Unlock() + + if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok { + // We went unready or are already on a different generation. + // Avoid registering and starting the watch as it will have to be + // terminated immediately anyway. + return + } + // Update watcher.forget function once we can compute it. watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported) // Update the bookMarkAfterResourceVersion watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn()) c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported) + addedWatcher = true // Add it to the queue only when the client support watch bookmarks. if watcher.allowWatchBookmarks { @@ -632,6 +643,14 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watcherIdx++ }() + if !addedWatcher { + // Watcher isn't really started at this point, so it's safe to just drop it. + // + // We're simulating the immediate watch termination, which boils down to simply + // closing the watcher. + return newImmediateCloseWatcher(), nil + } + go watcher.processInterval(ctx, cacheInterval, startWatchRV) return watcher, nil } @@ -1377,3 +1396,24 @@ func (c *errWatcher) ResultChan() <-chan watch.Event { func (c *errWatcher) Stop() { // no-op } + +// immediateCloseWatcher implements watch.Interface that is immediately closed +type immediateCloseWatcher struct { + result chan watch.Event +} + +func newImmediateCloseWatcher() *immediateCloseWatcher { + watcher := &immediateCloseWatcher{result: make(chan watch.Event)} + close(watcher.result) + return watcher +} + +// Implements watch.Interface. +func (c *immediateCloseWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Implements watch.Interface. +func (c *immediateCloseWatcher) Stop() { + // no-op +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 9a1a905cce0..152d92b9aea 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -128,6 +128,7 @@ type dummyStorage struct { sync.RWMutex err error getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error + watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) } type dummyWatch struct { @@ -155,7 +156,10 @@ func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object, func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions, _ storage.ValidateObjectFunc, _ runtime.Object) error { return fmt.Errorf("unimplemented") } -func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { +func (d *dummyStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + if d.watchFn != nil { + return d.watchFn(ctx, key, opts) + } d.RLock() defer d.RUnlock() @@ -447,7 +451,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { } } -func TestCacheDontAcceptRequestsStopped(t *testing.T) { +func TestCacherDontAcceptRequestsStopped(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) if err != nil { @@ -509,6 +513,117 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) { } } +func TestCacherDontMissEventsOnReinitialization(t *testing.T) { + makePod := func(i int) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", i), + }, + } + } + + listCalls, watchCalls := 0, 0 + backingStorage := &dummyStorage{ + getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { + podList := listObj.(*example.PodList) + var err error + switch listCalls { + case 0: + podList.ListMeta = metav1.ListMeta{ResourceVersion: "1"} + case 1: + podList.ListMeta = metav1.ListMeta{ResourceVersion: "10"} + default: + err = fmt.Errorf("unexpected list call") + } + listCalls++ + return err + }, + watchFn: func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { + var w *watch.FakeWatcher + var err error + switch watchCalls { + case 0: + w = watch.NewFakeWithChanSize(10, false) + for i := 2; i < 8; i++ { + w.Add(makePod(i)) + } + // Emit an error to force relisting. + w.Error(nil) + w.Stop() + case 1: + w = watch.NewFakeWithChanSize(10, false) + for i := 12; i < 18; i++ { + w.Add(makePod(i)) + } + w.Stop() + default: + err = fmt.Errorf("unexpected watch call") + } + watchCalls++ + return w, err + }, + } + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + concurrency := 1000 + wg := sync.WaitGroup{} + wg.Add(concurrency) + + // Ensure that test doesn't deadlock if cacher already processed everything + // and get back into Pending state before some watches get called. + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + errCh := make(chan error, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + w, err := cacher.Watch(ctx, "pods", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything}) + if err != nil { + // Watch failed to initialize (this most probably means that cacher + // already moved back to Pending state before watch initialized. + // Ignore this case. + return + } + defer w.Stop() + + prevRV := -1 + for event := range w.ResultChan() { + if event.Type == watch.Error { + break + } + object := event.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + rv, err := strconv.Atoi(object.(*example.Pod).ResourceVersion) + if err != nil { + errCh <- fmt.Errorf("incorrect resource version: %v", err) + return + } + if prevRV != -1 && prevRV+1 != rv { + errCh <- fmt.Errorf("unexpected event received, prevRV=%d, rv=%d", prevRV, rv) + return + } + prevRV = rv + } + + }() + } + wg.Wait() + close(errCh) + + for err := range errCh { + t.Error(err) + } +} + func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go index 47e03fe9e27..012d6d585c9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go @@ -39,7 +39,8 @@ const ( // └---------------------------┘ type ready struct { state status // represent the state of the variable - lock sync.RWMutex // protect the state variable + generation int // represent the number of times we have transtioned to ready + lock sync.RWMutex // protect the state and generation variables restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated waitCh chan struct{} // blocks until is ready or stopped } @@ -60,11 +61,18 @@ func (r *ready) done() chan struct{} { // wait blocks until it is Ready or Stopped, it returns an error if is Stopped. func (r *ready) wait(ctx context.Context) error { + _, err := r.waitAndReadGeneration(ctx) + return err +} + +// waitAndReadGenration blocks until it is Ready or Stopped and returns number +// of times we entered ready state if Ready and error otherwise. +func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) { for { // r.done() only blocks if state is Pending select { case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() case <-r.done(): } @@ -79,23 +87,30 @@ func (r *ready) wait(ctx context.Context) error { // waiting despite the state moved back to Pending. r.lock.RUnlock() case Ready: + generation := r.generation r.lock.RUnlock() - return nil + return generation, nil case Stopped: r.lock.RUnlock() - return fmt.Errorf("apiserver cacher is stopped") + return 0, fmt.Errorf("apiserver cacher is stopped") default: r.lock.RUnlock() - return fmt.Errorf("unexpected apiserver cache state: %v", r.state) + return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state) } } } // check returns true only if it is Ready. func (r *ready) check() bool { + _, ok := r.checkAndReadGeneration() + return ok +} + +// checkAndReadGeneration returns the current generation and whether it is Ready. +func (r *ready) checkAndReadGeneration() (int, bool) { r.lock.RLock() defer r.lock.RUnlock() - return r.state == Ready + return r.generation, r.state == Ready } // set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped. @@ -107,6 +122,7 @@ func (r *ready) set(ok bool) { } if ok && r.state == Pending { r.state = Ready + r.generation++ select { case <-r.waitCh: default: diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go index 12560a6fa27..34c20592550 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go @@ -52,9 +52,18 @@ func Test_newReadySetIdempotent(t *testing.T) { ready.set(false) ready.set(false) ready.set(false) + if generation, ok := ready.checkAndReadGeneration(); generation != 0 || ok { + t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) + } + ready.set(true) + if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok { + t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) + } ready.set(true) ready.set(true) - ready.set(true) + if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok { + t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) + } ready.set(false) // create 10 goroutines waiting for ready and stop for i := 0; i < 10; i++ { @@ -68,6 +77,9 @@ func Test_newReadySetIdempotent(t *testing.T) { t.Errorf("ready should be blocking") } ready.set(true) + if generation, ok := ready.checkAndReadGeneration(); generation != 2 || !ok { + t.Errorf("unexpected state: generation=%v ready=%v", generation, ok) + } for i := 0; i < 10; i++ { if err := <-errCh; err != nil { t.Errorf("unexpected error on channel %d", i)