From c676e234cc1036a2a1147f2eeef9c89468671330 Mon Sep 17 00:00:00 2001 From: "fansong.cfs" Date: Sat, 30 Mar 2019 20:07:41 +0800 Subject: [PATCH] add timeout suuport for watch --- .../apiserver/pkg/endpoints/handlers/get.go | 2 + .../apiserver/pkg/storage/cacher/cacher.go | 59 ++++++++++++++++--- .../storage/cacher/cacher_whitebox_test.go | 10 ++-- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go index d6757730bc4..0b232cd1ac5 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go @@ -249,6 +249,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc } klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout) + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() watcher, err := rw.Watch(ctx, &opts) if err != nil { scope.err(err, w, req) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 320c43c125a..65ae8ab6801 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -19,6 +19,7 @@ package cacher import ( "context" "fmt" + "math" "net/http" "reflect" "sync" @@ -363,11 +364,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, chanSize = 1000 } + // Determine watch timeout + timeout := time.Duration(math.MaxInt64) + if deadline, ok := ctx.Deadline(); ok { + timeout = deadline.Sub(time.Now()) + } // Create a watcher here to reduce memory allocations under lock, // given that memory allocation may trigger GC and block the thread. // Also note that emptyFunc is a placeholder, until we will be able // to compute watcher.forget function (which has to happen under lock). - watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner) + watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, timeout) // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked @@ -400,7 +406,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.watcherIdx++ }() - go watcher.process(initEvents, watchRV) + go watcher.process(ctx, initEvents, watchRV) return watcher, nil } @@ -887,9 +893,34 @@ type cacheWatcher struct { stopped bool forget func() versioner storage.Versioner + timer *time.Timer } -func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher { +var timerPool sync.Pool + +func newTimer(d time.Duration) *time.Timer { + t, ok := timerPool.Get().(*time.Timer) + if ok { + t.Reset(d) + } else { + t = time.NewTimer(d) + } + return t +} + +func freeTimer(timer *time.Timer) { + if !timer.Stop() { + // Consume triggered (but not yet received) timer event + // so that future reuse does not get a spurious timeout. + select { + case <-timer.C: + default: + } + } + timerPool.Put(timer) +} + +func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, timeout time.Duration) *cacheWatcher { return &cacheWatcher{ input: make(chan *watchCacheEvent, chanSize), result: make(chan watch.Event, chanSize), @@ -898,6 +929,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve stopped: false, forget: forget, versioner: versioner, + timer: newTimer(timeout), } } @@ -918,6 +950,7 @@ func (c *cacheWatcher) stop() { c.stopped = true close(c.done) close(c.input) + freeTimer(c.timer) } } @@ -1006,7 +1039,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } } -func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) { +func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { defer utilruntime.HandleCrash() // Check how long we are processing initEvents. @@ -1042,10 +1075,20 @@ func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion ui defer close(c.result) defer c.Stop() - for event := range c.input { - // only send events newer than resourceVersion - if event.ResourceVersion > resourceVersion { - c.sendWatchCacheEvent(event) + for { + select { + case event, ok := <-c.input: + if !ok { + return + } + // only send events newer than resourceVersion + if event.ResourceVersion > resourceVersion { + c.sendWatchCacheEvent(event) + } + case <-ctx.Done(): + return + case <-c.timer.C: + return } } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index f102095f05d..b529d518608 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -19,6 +19,7 @@ package cacher import ( "context" "fmt" + "math" "reflect" "strconv" "sync" @@ -63,8 +64,8 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w = newCacheWatcher(0, filter, forget, testVersioner{}) - go w.process(initEvents, 0) + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64)) + go w.process(context.Background(), initEvents, 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -182,8 +183,9 @@ TestCase: for j := range testCase.events { testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, filter, forget, testVersioner{}) - go w.process(testCase.events, 0) + + w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64)) + go w.process(context.Background(), testCase.events, 0) ch := w.ResultChan() for j, event := range testCase.expected { e := <-ch