mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 02:11:09 +00:00
Merge pull request #116172 from wojtek-t/fix_watch_cache
Fix missed watch events when watch is initialized simultanously with reinitializing watchcache
This commit is contained in:
commit
856d6d9caa
@ -524,7 +524,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.ready.wait(ctx); err != nil {
|
readyGeneration, err := c.ready.waitAndReadGeneration(ctx)
|
||||||
|
if err != nil {
|
||||||
return nil, errors.NewServiceUnavailable(err.Error())
|
return nil, errors.NewServiceUnavailable(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -616,14 +617,24 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
return newErrWatcher(err), nil
|
return newErrWatcher(err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addedWatcher := false
|
||||||
func() {
|
func() {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
|
if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
|
||||||
|
// We went unready or are already on a different generation.
|
||||||
|
// Avoid registering and starting the watch as it will have to be
|
||||||
|
// terminated immediately anyway.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Update watcher.forget function once we can compute it.
|
// Update watcher.forget function once we can compute it.
|
||||||
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
||||||
// Update the bookMarkAfterResourceVersion
|
// Update the bookMarkAfterResourceVersion
|
||||||
watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn())
|
watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn())
|
||||||
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
|
||||||
|
addedWatcher = true
|
||||||
|
|
||||||
// Add it to the queue only when the client support watch bookmarks.
|
// Add it to the queue only when the client support watch bookmarks.
|
||||||
if watcher.allowWatchBookmarks {
|
if watcher.allowWatchBookmarks {
|
||||||
@ -632,6 +643,14 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
|
|||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if !addedWatcher {
|
||||||
|
// Watcher isn't really started at this point, so it's safe to just drop it.
|
||||||
|
//
|
||||||
|
// We're simulating the immediate watch termination, which boils down to simply
|
||||||
|
// closing the watcher.
|
||||||
|
return newImmediateCloseWatcher(), nil
|
||||||
|
}
|
||||||
|
|
||||||
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
|
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
|
||||||
return watcher, nil
|
return watcher, nil
|
||||||
}
|
}
|
||||||
@ -1377,3 +1396,24 @@ func (c *errWatcher) ResultChan() <-chan watch.Event {
|
|||||||
func (c *errWatcher) Stop() {
|
func (c *errWatcher) Stop() {
|
||||||
// no-op
|
// no-op
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// immediateCloseWatcher implements watch.Interface that is immediately closed
|
||||||
|
type immediateCloseWatcher struct {
|
||||||
|
result chan watch.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func newImmediateCloseWatcher() *immediateCloseWatcher {
|
||||||
|
watcher := &immediateCloseWatcher{result: make(chan watch.Event)}
|
||||||
|
close(watcher.result)
|
||||||
|
return watcher
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements watch.Interface.
|
||||||
|
func (c *immediateCloseWatcher) ResultChan() <-chan watch.Event {
|
||||||
|
return c.result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implements watch.Interface.
|
||||||
|
func (c *immediateCloseWatcher) Stop() {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
@ -128,6 +128,7 @@ type dummyStorage struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
err error
|
err error
|
||||||
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
|
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
|
||||||
|
watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type dummyWatch struct {
|
type dummyWatch struct {
|
||||||
@ -155,7 +156,10 @@ func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object,
|
|||||||
func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions, _ storage.ValidateObjectFunc, _ runtime.Object) error {
|
func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions, _ storage.ValidateObjectFunc, _ runtime.Object) error {
|
||||||
return fmt.Errorf("unimplemented")
|
return fmt.Errorf("unimplemented")
|
||||||
}
|
}
|
||||||
func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
|
func (d *dummyStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
||||||
|
if d.watchFn != nil {
|
||||||
|
return d.watchFn(ctx, key, opts)
|
||||||
|
}
|
||||||
d.RLock()
|
d.RLock()
|
||||||
defer d.RUnlock()
|
defer d.RUnlock()
|
||||||
|
|
||||||
@ -447,7 +451,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCacheDontAcceptRequestsStopped(t *testing.T) {
|
func TestCacherDontAcceptRequestsStopped(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -509,6 +513,117 @@ func TestCacheDontAcceptRequestsStopped(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
|
||||||
|
makePod := func(i int) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("pod-%d", i),
|
||||||
|
Namespace: "ns",
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", i),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
listCalls, watchCalls := 0, 0
|
||||||
|
backingStorage := &dummyStorage{
|
||||||
|
getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
|
||||||
|
podList := listObj.(*example.PodList)
|
||||||
|
var err error
|
||||||
|
switch listCalls {
|
||||||
|
case 0:
|
||||||
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: "1"}
|
||||||
|
case 1:
|
||||||
|
podList.ListMeta = metav1.ListMeta{ResourceVersion: "10"}
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("unexpected list call")
|
||||||
|
}
|
||||||
|
listCalls++
|
||||||
|
return err
|
||||||
|
},
|
||||||
|
watchFn: func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
|
||||||
|
var w *watch.FakeWatcher
|
||||||
|
var err error
|
||||||
|
switch watchCalls {
|
||||||
|
case 0:
|
||||||
|
w = watch.NewFakeWithChanSize(10, false)
|
||||||
|
for i := 2; i < 8; i++ {
|
||||||
|
w.Add(makePod(i))
|
||||||
|
}
|
||||||
|
// Emit an error to force relisting.
|
||||||
|
w.Error(nil)
|
||||||
|
w.Stop()
|
||||||
|
case 1:
|
||||||
|
w = watch.NewFakeWithChanSize(10, false)
|
||||||
|
for i := 12; i < 18; i++ {
|
||||||
|
w.Add(makePod(i))
|
||||||
|
}
|
||||||
|
w.Stop()
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("unexpected watch call")
|
||||||
|
}
|
||||||
|
watchCalls++
|
||||||
|
return w, err
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
|
defer cacher.Stop()
|
||||||
|
|
||||||
|
concurrency := 1000
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(concurrency)
|
||||||
|
|
||||||
|
// Ensure that test doesn't deadlock if cacher already processed everything
|
||||||
|
// and get back into Pending state before some watches get called.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
errCh := make(chan error, concurrency)
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
w, err := cacher.Watch(ctx, "pods", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
// Watch failed to initialize (this most probably means that cacher
|
||||||
|
// already moved back to Pending state before watch initialized.
|
||||||
|
// Ignore this case.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer w.Stop()
|
||||||
|
|
||||||
|
prevRV := -1
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
if event.Type == watch.Error {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
object := event.Object
|
||||||
|
if co, ok := object.(runtime.CacheableObject); ok {
|
||||||
|
object = co.GetObject()
|
||||||
|
}
|
||||||
|
rv, err := strconv.Atoi(object.(*example.Pod).ResourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- fmt.Errorf("incorrect resource version: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if prevRV != -1 && prevRV+1 != rv {
|
||||||
|
errCh <- fmt.Errorf("unexpected event received, prevRV=%d, rv=%d", prevRV, rv)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
prevRV = rv
|
||||||
|
}
|
||||||
|
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
|
||||||
|
for err := range errCh {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _, err := newTestCacher(backingStorage)
|
cacher, _, err := newTestCacher(backingStorage)
|
||||||
|
@ -39,7 +39,8 @@ const (
|
|||||||
// └---------------------------┘
|
// └---------------------------┘
|
||||||
type ready struct {
|
type ready struct {
|
||||||
state status // represent the state of the variable
|
state status // represent the state of the variable
|
||||||
lock sync.RWMutex // protect the state variable
|
generation int // represent the number of times we have transtioned to ready
|
||||||
|
lock sync.RWMutex // protect the state and generation variables
|
||||||
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
|
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
|
||||||
waitCh chan struct{} // blocks until is ready or stopped
|
waitCh chan struct{} // blocks until is ready or stopped
|
||||||
}
|
}
|
||||||
@ -60,11 +61,18 @@ func (r *ready) done() chan struct{} {
|
|||||||
|
|
||||||
// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
|
// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
|
||||||
func (r *ready) wait(ctx context.Context) error {
|
func (r *ready) wait(ctx context.Context) error {
|
||||||
|
_, err := r.waitAndReadGeneration(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitAndReadGenration blocks until it is Ready or Stopped and returns number
|
||||||
|
// of times we entered ready state if Ready and error otherwise.
|
||||||
|
func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
|
||||||
for {
|
for {
|
||||||
// r.done() only blocks if state is Pending
|
// r.done() only blocks if state is Pending
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return 0, ctx.Err()
|
||||||
case <-r.done():
|
case <-r.done():
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,23 +87,30 @@ func (r *ready) wait(ctx context.Context) error {
|
|||||||
// waiting despite the state moved back to Pending.
|
// waiting despite the state moved back to Pending.
|
||||||
r.lock.RUnlock()
|
r.lock.RUnlock()
|
||||||
case Ready:
|
case Ready:
|
||||||
|
generation := r.generation
|
||||||
r.lock.RUnlock()
|
r.lock.RUnlock()
|
||||||
return nil
|
return generation, nil
|
||||||
case Stopped:
|
case Stopped:
|
||||||
r.lock.RUnlock()
|
r.lock.RUnlock()
|
||||||
return fmt.Errorf("apiserver cacher is stopped")
|
return 0, fmt.Errorf("apiserver cacher is stopped")
|
||||||
default:
|
default:
|
||||||
r.lock.RUnlock()
|
r.lock.RUnlock()
|
||||||
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
|
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check returns true only if it is Ready.
|
// check returns true only if it is Ready.
|
||||||
func (r *ready) check() bool {
|
func (r *ready) check() bool {
|
||||||
|
_, ok := r.checkAndReadGeneration()
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkAndReadGeneration returns the current generation and whether it is Ready.
|
||||||
|
func (r *ready) checkAndReadGeneration() (int, bool) {
|
||||||
r.lock.RLock()
|
r.lock.RLock()
|
||||||
defer r.lock.RUnlock()
|
defer r.lock.RUnlock()
|
||||||
return r.state == Ready
|
return r.generation, r.state == Ready
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
|
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
|
||||||
@ -107,6 +122,7 @@ func (r *ready) set(ok bool) {
|
|||||||
}
|
}
|
||||||
if ok && r.state == Pending {
|
if ok && r.state == Pending {
|
||||||
r.state = Ready
|
r.state = Ready
|
||||||
|
r.generation++
|
||||||
select {
|
select {
|
||||||
case <-r.waitCh:
|
case <-r.waitCh:
|
||||||
default:
|
default:
|
||||||
|
@ -52,9 +52,18 @@ func Test_newReadySetIdempotent(t *testing.T) {
|
|||||||
ready.set(false)
|
ready.set(false)
|
||||||
ready.set(false)
|
ready.set(false)
|
||||||
ready.set(false)
|
ready.set(false)
|
||||||
|
if generation, ok := ready.checkAndReadGeneration(); generation != 0 || ok {
|
||||||
|
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
|
||||||
|
}
|
||||||
|
ready.set(true)
|
||||||
|
if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
|
||||||
|
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
|
||||||
|
}
|
||||||
ready.set(true)
|
ready.set(true)
|
||||||
ready.set(true)
|
ready.set(true)
|
||||||
ready.set(true)
|
if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
|
||||||
|
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
|
||||||
|
}
|
||||||
ready.set(false)
|
ready.set(false)
|
||||||
// create 10 goroutines waiting for ready and stop
|
// create 10 goroutines waiting for ready and stop
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -68,6 +77,9 @@ func Test_newReadySetIdempotent(t *testing.T) {
|
|||||||
t.Errorf("ready should be blocking")
|
t.Errorf("ready should be blocking")
|
||||||
}
|
}
|
||||||
ready.set(true)
|
ready.set(true)
|
||||||
|
if generation, ok := ready.checkAndReadGeneration(); generation != 2 || !ok {
|
||||||
|
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
|
||||||
|
}
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
if err := <-errCh; err != nil {
|
if err := <-errCh; err != nil {
|
||||||
t.Errorf("unexpected error on channel %d", i)
|
t.Errorf("unexpected error on channel %d", i)
|
||||||
|
Loading…
Reference in New Issue
Block a user