mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-17 15:47:48 +00:00
Merge pull request #89652 from liggitt/relist-timeout
Fix client watch reestablishment handling of client-side timeouts Kubernetes-commit: 38f0a8bc74bed92bef87e20a79c36c5feea78b3d
This commit is contained in:
2
Godeps/Godeps.json
generated
2
Godeps/Godeps.json
generated
@@ -352,7 +352,7 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/apimachinery",
|
"ImportPath": "k8s.io/apimachinery",
|
||||||
"Rev": "fa0d5bf06730"
|
"Rev": "591a38b7a7b7"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "k8s.io/gengo",
|
"ImportPath": "k8s.io/gengo",
|
||||||
|
4
go.mod
4
go.mod
@@ -29,7 +29,7 @@ require (
|
|||||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
|
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
|
||||||
google.golang.org/appengine v1.5.0 // indirect
|
google.golang.org/appengine v1.5.0 // indirect
|
||||||
k8s.io/api v0.0.0-20200403220253-fa879b399cd0
|
k8s.io/api v0.0.0-20200403220253-fa879b399cd0
|
||||||
k8s.io/apimachinery v0.0.0-20200403220105-fa0d5bf06730
|
k8s.io/apimachinery v0.0.0-20200407101112-591a38b7a7b7
|
||||||
k8s.io/klog v1.0.0
|
k8s.io/klog v1.0.0
|
||||||
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
|
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89
|
||||||
sigs.k8s.io/yaml v1.2.0
|
sigs.k8s.io/yaml v1.2.0
|
||||||
@@ -39,5 +39,5 @@ replace (
|
|||||||
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
|
golang.org/x/sys => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // pinned to release-branch.go1.13
|
||||||
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
|
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // pinned to release-branch.go1.13
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20200403220253-fa879b399cd0
|
k8s.io/api => k8s.io/api v0.0.0-20200403220253-fa879b399cd0
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200403220105-fa0d5bf06730
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20200407101112-591a38b7a7b7
|
||||||
)
|
)
|
||||||
|
2
go.sum
2
go.sum
@@ -188,7 +188,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|||||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
k8s.io/api v0.0.0-20200403220253-fa879b399cd0/go.mod h1:IHUHI0RKOjAF2/LmT+5g7hlTufmlzdZqccff7saac3A=
|
k8s.io/api v0.0.0-20200403220253-fa879b399cd0/go.mod h1:IHUHI0RKOjAF2/LmT+5g7hlTufmlzdZqccff7saac3A=
|
||||||
k8s.io/apimachinery v0.0.0-20200403220105-fa0d5bf06730/go.mod h1:plsINh5MKcZh761kXB2H+kXnD8vwFIAy7bitct1VPNU=
|
k8s.io/apimachinery v0.0.0-20200407101112-591a38b7a7b7/go.mod h1:plsINh5MKcZh761kXB2H+kXnD8vwFIAy7bitct1VPNU=
|
||||||
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
||||||
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||||
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
|
||||||
|
@@ -669,7 +669,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// The watch stream mechanism handles many common partial data errors, so closed
|
// The watch stream mechanism handles many common partial data errors, so closed
|
||||||
// connections can be retried in many cases.
|
// connections can be retried in many cases.
|
||||||
if net.IsProbableEOF(err) {
|
if net.IsProbableEOF(err) || net.IsTimeout(err) {
|
||||||
return watch.NewEmptyWatch(), nil
|
return watch.NewEmptyWatch(), nil
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
7
tools/cache/reflector.go
vendored
7
tools/cache/reflector.go
vendored
@@ -396,6 +396,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
AllowWatchBookmarks: true,
|
AllowWatchBookmarks: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
|
||||||
|
start := r.clock.Now()
|
||||||
w, err := r.listerWatcher.Watch(options)
|
w, err := r.listerWatcher.Watch(options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
|
||||||
@@ -409,7 +411,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
|
||||||
if err != errorStopRequested {
|
if err != errorStopRequested {
|
||||||
switch {
|
switch {
|
||||||
case isExpiredError(err):
|
case isExpiredError(err):
|
||||||
@@ -436,8 +438,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
|
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
|
||||||
start := r.clock.Now()
|
|
||||||
eventCount := 0
|
eventCount := 0
|
||||||
|
|
||||||
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
||||||
|
6
tools/cache/reflector_test.go
vendored
6
tools/cache/reflector_test.go
vendored
@@ -136,7 +136,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
|
|||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
var resumeRV string
|
var resumeRV string
|
||||||
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
|
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("unexpected non-error")
|
t.Errorf("unexpected non-error")
|
||||||
}
|
}
|
||||||
@@ -156,7 +156,7 @@ func TestReflectorWatchHandler(t *testing.T) {
|
|||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
var resumeRV string
|
var resumeRV string
|
||||||
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
|
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
|
|||||||
var resumeRV string
|
var resumeRV string
|
||||||
stopWatch := make(chan struct{}, 1)
|
stopWatch := make(chan struct{}, 1)
|
||||||
stopWatch <- struct{}{}
|
stopWatch <- struct{}{}
|
||||||
err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
|
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch)
|
||||||
if err != errorStopRequested {
|
if err != errorStopRequested {
|
||||||
t.Errorf("expected stop error, got %q", err)
|
t.Errorf("expected stop error, got %q", err)
|
||||||
}
|
}
|
||||||
|
@@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
|
|||||||
|
|
||||||
default:
|
default:
|
||||||
msg := "Watch failed: %v"
|
msg := "Watch failed: %v"
|
||||||
if net.IsProbableEOF(err) {
|
if net.IsProbableEOF(err) || net.IsTimeout(err) {
|
||||||
klog.V(5).Infof(msg, err)
|
klog.V(5).Infof(msg, err)
|
||||||
// Retry
|
// Retry
|
||||||
return false, 0
|
return false, 0
|
||||||
|
Reference in New Issue
Block a user