mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Merge pull request #76065 from hormes/add_timeout_support_for_watch
add timeout support for watch
This commit is contained in:
commit
72ed03fed4
@ -248,7 +248,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
|
|||||||
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
|
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
|
||||||
}
|
}
|
||||||
klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout)
|
klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout)
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancel()
|
||||||
watcher, err := rw.Watch(ctx, &opts)
|
watcher, err := rw.Watch(ctx, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
scope.err(err, w, req)
|
scope.err(err, w, req)
|
||||||
|
@ -45,9 +45,15 @@ func newDecoratedWatcher(w watch.Interface, decorator ObjectFunc) *decoratedWatc
|
|||||||
|
|
||||||
func (d *decoratedWatcher) run(ctx context.Context) {
|
func (d *decoratedWatcher) run(ctx context.Context) {
|
||||||
var recv, send watch.Event
|
var recv, send watch.Event
|
||||||
|
var ok bool
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case recv = <-d.w.ResultChan():
|
case recv, ok = <-d.w.ResultChan():
|
||||||
|
// The underlying channel may be closed after timeout.
|
||||||
|
if !ok {
|
||||||
|
d.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
switch recv.Type {
|
switch recv.Type {
|
||||||
case watch.Added, watch.Modified, watch.Deleted:
|
case watch.Added, watch.Modified, watch.Deleted:
|
||||||
err := d.decorator(recv.Object)
|
err := d.decorator(recv.Object)
|
||||||
|
@ -413,7 +413,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||||||
c.watcherIdx++
|
c.watcherIdx++
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go watcher.process(initEvents, watchRV)
|
go watcher.process(ctx, initEvents, watchRV)
|
||||||
return watcher, nil
|
return watcher, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1063,7 +1063,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
|
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
// Check how long we are processing initEvents.
|
// Check how long we are processing initEvents.
|
||||||
@ -1099,11 +1099,19 @@ func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion ui
|
|||||||
|
|
||||||
defer close(c.result)
|
defer close(c.result)
|
||||||
defer c.Stop()
|
defer c.Stop()
|
||||||
for event := range c.input {
|
for {
|
||||||
|
select {
|
||||||
|
case event, ok := <-c.input:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
// only send events newer than resourceVersion
|
// only send events newer than resourceVersion
|
||||||
if event.ResourceVersion > resourceVersion {
|
if event.ResourceVersion > resourceVersion {
|
||||||
c.sendWatchCacheEvent(event)
|
c.sendWatchCacheEvent(event)
|
||||||
}
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||||||
// set the size of the buffer of w.result to 0, so that the writes to
|
// set the size of the buffer of w.result to 0, so that the writes to
|
||||||
// w.result is blocked.
|
// w.result is blocked.
|
||||||
w = newCacheWatcher(0, filter, forget, testVersioner{})
|
w = newCacheWatcher(0, filter, forget, testVersioner{})
|
||||||
go w.process(initEvents, 0)
|
go w.process(context.Background(), initEvents, 0)
|
||||||
w.Stop()
|
w.Stop()
|
||||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||||
lock.RLock()
|
lock.RLock()
|
||||||
@ -182,8 +182,9 @@ TestCase:
|
|||||||
for j := range testCase.events {
|
for j := range testCase.events {
|
||||||
testCase.events[j].ResourceVersion = uint64(j) + 1
|
testCase.events[j].ResourceVersion = uint64(j) + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
w := newCacheWatcher(0, filter, forget, testVersioner{})
|
w := newCacheWatcher(0, filter, forget, testVersioner{})
|
||||||
go w.process(testCase.events, 0)
|
go w.process(context.Background(), testCase.events, 0)
|
||||||
ch := w.ResultChan()
|
ch := w.ResultChan()
|
||||||
for j, event := range testCase.expected {
|
for j, event := range testCase.expected {
|
||||||
e := <-ch
|
e := <-ch
|
||||||
@ -445,3 +446,41 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
||||||
|
var w *cacheWatcher
|
||||||
|
done := make(chan struct{})
|
||||||
|
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||||
|
forget := func() {
|
||||||
|
w.stop()
|
||||||
|
done <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
maxRetriesToProduceTheRaceCondition := 1000
|
||||||
|
// Simulating the timer is fired and stopped concurrently by set time
|
||||||
|
// timeout to zero and run the Stop goroutine concurrently.
|
||||||
|
// May sure that the watch will not be blocked on Stop.
|
||||||
|
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||||
|
w = newCacheWatcher(0, filter, forget, testVersioner{})
|
||||||
|
go w.Stop()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("stop is blocked when the timer is fired concurrently")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// After that, verifies the cacheWatcher.process goroutine works correctly.
|
||||||
|
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||||
|
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{})
|
||||||
|
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
||||||
|
ctx, _ := context.WithTimeout(context.Background(), time.Hour)
|
||||||
|
go w.process(ctx, nil, 0)
|
||||||
|
select {
|
||||||
|
case <-w.ResultChan():
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("expected received a event on ResultChan")
|
||||||
|
}
|
||||||
|
w.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user