mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
add timeout suuport for watch
This commit is contained in:
parent
312eb890e6
commit
c676e234cc
@ -249,6 +249,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
|
|||||||
}
|
}
|
||||||
klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout)
|
klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancel()
|
||||||
watcher, err := rw.Watch(ctx, &opts)
|
watcher, err := rw.Watch(ctx, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
scope.err(err, w, req)
|
scope.err(err, w, req)
|
||||||
|
@ -19,6 +19,7 @@ package cacher
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
@ -363,11 +364,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||||||
chanSize = 1000
|
chanSize = 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Determine watch timeout
|
||||||
|
timeout := time.Duration(math.MaxInt64)
|
||||||
|
if deadline, ok := ctx.Deadline(); ok {
|
||||||
|
timeout = deadline.Sub(time.Now())
|
||||||
|
}
|
||||||
// Create a watcher here to reduce memory allocations under lock,
|
// Create a watcher here to reduce memory allocations under lock,
|
||||||
// given that memory allocation may trigger GC and block the thread.
|
// given that memory allocation may trigger GC and block the thread.
|
||||||
// Also note that emptyFunc is a placeholder, until we will be able
|
// Also note that emptyFunc is a placeholder, until we will be able
|
||||||
// to compute watcher.forget function (which has to happen under lock).
|
// to compute watcher.forget function (which has to happen under lock).
|
||||||
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner)
|
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, timeout)
|
||||||
|
|
||||||
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||||
// no new events will be processed in the meantime. The watchCache will be unlocked
|
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||||
@ -400,7 +406,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go watcher.process(initEvents, watchRV)
|
go watcher.process(ctx, initEvents, watchRV)
|
||||||
return watcher, nil
|
return watcher, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -887,9 +893,34 @@ type cacheWatcher struct {
|
|||||||
stopped bool
|
stopped bool
|
||||||
forget func()
|
forget func()
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
|
timer *time.Timer
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
|
var timerPool sync.Pool
|
||||||
|
|
||||||
|
func newTimer(d time.Duration) *time.Timer {
|
||||||
|
t, ok := timerPool.Get().(*time.Timer)
|
||||||
|
if ok {
|
||||||
|
t.Reset(d)
|
||||||
|
} else {
|
||||||
|
t = time.NewTimer(d)
|
||||||
|
}
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func freeTimer(timer *time.Timer) {
|
||||||
|
if !timer.Stop() {
|
||||||
|
// Consume triggered (but not yet received) timer event
|
||||||
|
// so that future reuse does not get a spurious timeout.
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
timerPool.Put(timer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, timeout time.Duration) *cacheWatcher {
|
||||||
return &cacheWatcher{
|
return &cacheWatcher{
|
||||||
input: make(chan *watchCacheEvent, chanSize),
|
input: make(chan *watchCacheEvent, chanSize),
|
||||||
result: make(chan watch.Event, chanSize),
|
result: make(chan watch.Event, chanSize),
|
||||||
@ -898,6 +929,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve
|
|||||||
stopped: false,
|
stopped: false,
|
||||||
forget: forget,
|
forget: forget,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
|
timer: newTimer(timeout),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -918,6 +950,7 @@ func (c *cacheWatcher) stop() {
|
|||||||
c.stopped = true
|
c.stopped = true
|
||||||
close(c.done)
|
close(c.done)
|
||||||
close(c.input)
|
close(c.input)
|
||||||
|
freeTimer(c.timer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1006,7 +1039,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
|
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
// Check how long we are processing initEvents.
|
// Check how long we are processing initEvents.
|
||||||
@ -1042,11 +1075,21 @@ func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion ui
|
|||||||
|
|
||||||
defer close(c.result)
|
defer close(c.result)
|
||||||
defer c.Stop()
|
defer c.Stop()
|
||||||
for event := range c.input {
|
for {
|
||||||
|
select {
|
||||||
|
case event, ok := <-c.input:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
// only send events newer than resourceVersion
|
// only send events newer than resourceVersion
|
||||||
if event.ResourceVersion > resourceVersion {
|
if event.ResourceVersion > resourceVersion {
|
||||||
c.sendWatchCacheEvent(event)
|
c.sendWatchCacheEvent(event)
|
||||||
}
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-c.timer.C:
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ package cacher
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
@ -63,8 +64,8 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// set the size of the buffer of w.result to 0, so that the writes to
|
// set the size of the buffer of w.result to 0, so that the writes to
|
||||||
// w.result is blocked.
|
// w.result is blocked.
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{})
|
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64))
|
||||||
go w.process(initEvents, 0)
|
go w.process(context.Background(), initEvents, 0)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
lock.RLock()
|
lock.RLock()
|
||||||
@ -182,8 +183,9 @@ TestCase:
|
|||||||
for j := range testCase.events {
|
for j := range testCase.events {
|
||||||
testCase.events[j].ResourceVersion = uint64(j) + 1
|
testCase.events[j].ResourceVersion = uint64(j) + 1
|
||||||
}
|
}
|
||||||
w := newCacheWatcher(0, filter, forget, testVersioner{})
|
|
||||||
go w.process(testCase.events, 0)
|
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64))
|
||||||
|
go w.process(context.Background(), testCase.events, 0)
|
||||||
ch := w.ResultChan()
|
ch := w.ResultChan()
|
||||||
for j, event := range testCase.expected {
|
for j, event := range testCase.expected {
|
||||||
e := <-ch
|
e := <-ch
|
||||||
|
Loading…
Reference in New Issue
Block a user