mirror of
https://github.com/kubernetes/client-go.git
synced 2025-12-26 14:42:27 +00:00
Fix client watch reestablishment handling of client-side timeouts
Kubernetes-commit: ea49e94be8a8b5285e1ea737df0bfd83e8b5158f
This commit is contained in:
committed by
Kubernetes Publisher
parent
88ba1c9b80
commit
dbcb4c305c
@@ -655,7 +655,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
||||
if err != nil {
|
||||
// The watch stream mechanism handles many common partial data errors, so closed
|
||||
// connections can be retried in many cases.
|
||||
if net.IsProbableEOF(err) {
|
||||
if net.IsProbableEOF(err) || net.IsTimeout(err) {
|
||||
return watch.NewEmptyWatch(), nil
|
||||
}
|
||||
return nil, err
|
||||
|
||||
7
tools/cache/reflector.go
vendored
7
tools/cache/reflector.go
vendored
@@ -364,6 +364,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
AllowWatchBookmarks: true,
|
||||
}
|
||||
|
||||
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
|
||||
start := r.clock.Now()
|
||||
w, err := r.listerWatcher.Watch(options)
|
||||
if err != nil {
|
||||
switch {
|
||||
@@ -390,7 +392,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||
if err != errorStopRequested {
|
||||
switch {
|
||||
case isExpiredError(err):
|
||||
@@ -417,8 +419,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
|
||||
}
|
||||
|
||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
|
||||
start := r.clock.Now()
|
||||
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
|
||||
eventCount := 0
|
||||
|
||||
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
||||
|
||||
6
tools/cache/reflector_test.go
vendored
6
tools/cache/reflector_test.go
vendored
@@ -136,7 +136,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
|
||||
fw.Stop()
|
||||
}()
|
||||
var resumeRV string
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
if err == nil {
|
||||
t.Errorf("unexpected non-error")
|
||||
}
|
||||
@@ -156,7 +156,7 @@ func TestReflectorWatchHandler(t *testing.T) {
|
||||
fw.Stop()
|
||||
}()
|
||||
var resumeRV string
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
@@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
|
||||
var resumeRV string
|
||||
stopWatch := make(chan struct{}, 1)
|
||||
stopWatch <- struct{}{}
|
||||
err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
|
||||
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch)
|
||||
if err != errorStopRequested {
|
||||
t.Errorf("expected stop error, got %q", err)
|
||||
}
|
||||
|
||||
@@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
|
||||
|
||||
default:
|
||||
msg := "Watch failed: %v"
|
||||
if net.IsProbableEOF(err) {
|
||||
if net.IsProbableEOF(err) || net.IsTimeout(err) {
|
||||
klog.V(5).Infof(msg, err)
|
||||
// Retry
|
||||
return false, 0
|
||||
|
||||
Reference in New Issue
Block a user