Compare commits

..

3 Commits

Author SHA1 Message Date
Kubernetes Publisher
5d8f446394 Update dependencies to v0.16.10-beta.0 tag 2020-04-17 02:27:51 +00:00
Kubernetes Publisher
c94387a2ca Merge pull request #89977 from liggitt/relist-timeout-1.16
Manual cherry pick of #89652: Fix client watch reestablishment handling of client-side timeouts

Kubernetes-commit: e4fa40f60f2668ec55a4247cabf0463dc0153522
2020-04-10 02:32:34 +00:00
Jordan Liggitt
45223f1cdc Fix client watch reestablishment handling of client-side timeouts
Kubernetes-commit: a0f1c874d42fe79d4d6eb9108df381f10f660d14
2020-03-30 10:36:01 -04:00
7 changed files with 17 additions and 16 deletions

4
Godeps/Godeps.json generated
View File

@@ -348,11 +348,11 @@
},
{
"ImportPath": "k8s.io/api",
"Rev": "v0.16.9-beta.0"
"Rev": "v0.16.10-beta.0"
},
{
"ImportPath": "k8s.io/apimachinery",
"Rev": "v0.16.9-beta.0"
"Rev": "v0.16.10-beta.0"
},
{
"ImportPath": "k8s.io/gengo",

8
go.mod
View File

@@ -26,8 +26,8 @@ require (
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/appengine v1.5.0 // indirect
k8s.io/api v0.16.9-beta.0
k8s.io/apimachinery v0.16.9-beta.0
k8s.io/api v0.16.10-beta.0
k8s.io/apimachinery v0.16.10-beta.0
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20190801114015-581e00157fb1
sigs.k8s.io/yaml v1.1.0
@@ -35,6 +35,6 @@ require (
replace (
golang.org/x/tools => golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7
k8s.io/api => k8s.io/api v0.16.9-beta.0
k8s.io/apimachinery => k8s.io/apimachinery v0.16.9-beta.0
k8s.io/api => k8s.io/api v0.16.10-beta.0
k8s.io/apimachinery => k8s.io/apimachinery v0.16.10-beta.0
)

4
go.sum
View File

@@ -196,8 +196,8 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.16.9-beta.0/go.mod h1:0suM9/Z/L80VZNc6zUkH/hC/QdnUrot25yRmsrJhzkM=
k8s.io/apimachinery v0.16.9-beta.0/go.mod h1:Xk2vD2TRRpuWYLQNM6lT9R7DSFZUYG03SarNkbGrnKE=
k8s.io/api v0.16.10-beta.0/go.mod h1:zKCb+ka/7VgqCBWk6KhCmB+4+S0eSzHvwTvDZs9h3+s=
k8s.io/apimachinery v0.16.10-beta.0/go.mod h1:Xk2vD2TRRpuWYLQNM6lT9R7DSFZUYG03SarNkbGrnKE=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=

View File

@@ -592,7 +592,7 @@ func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser)
if err != nil {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
return watch.NewEmptyWatch(), nil
}
return nil, err

View File

@@ -269,6 +269,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
AllowWatchBookmarks: true,
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
@@ -290,7 +292,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
return nil
}
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
@@ -314,8 +316,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
}
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way

View File

@@ -132,7 +132,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
if err == nil {
t.Errorf("unexpected non-error")
}
@@ -152,7 +152,7 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
if err != nil {
t.Errorf("unexpected error %v", err)
}
@@ -201,7 +201,7 @@ func TestReflectorStopWatch(t *testing.T) {
var resumeRV string
stopWatch := make(chan struct{}, 1)
stopWatch <- struct{}{}
err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch)
if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err)
}

View File

@@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
default:
msg := "Watch failed: %v"
if net.IsProbableEOF(err) {
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof(msg, err)
// Retry
return false, 0