diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go index 0ba586bfe51..7449cbb0a01 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go @@ -55,6 +55,12 @@ func JoinPreservingTrailingSlash(elem ...string) string { return result } +// IsTimeout returns true if the given error is a network timeout error +func IsTimeout(err error) bool { + neterr, ok := err.(net.Error) + return ok && neterr != nil && neterr.Timeout() +} + // IsProbableEOF returns true if the given error resembles a connection termination // scenario that would justify assuming that the watch is empty. // These errors are what the Go http stack returns back to us which are general diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go index 8af256eb12a..4269a836a87 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/streamwatcher.go @@ -113,7 +113,7 @@ func (sw *StreamWatcher) receive() { case io.ErrUnexpectedEOF: klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) default: - if net.IsProbableEOF(err) { + if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err) } else { sw.result <- Event{ diff --git a/staging/src/k8s.io/client-go/rest/request.go b/staging/src/k8s.io/client-go/rest/request.go index fe048ec421a..96fb19ce87c 100644 --- a/staging/src/k8s.io/client-go/rest/request.go +++ b/staging/src/k8s.io/client-go/rest/request.go @@ -669,7 +669,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { if err != nil { // The watch stream mechanism handles many common partial data errors, so closed // connections can be retried in many cases. - if net.IsProbableEOF(err) { + if net.IsProbableEOF(err) || net.IsTimeout(err) { return watch.NewEmptyWatch(), nil } return nil, err diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 7c581cedaff..8dfc416945a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -396,6 +396,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { AllowWatchBookmarks: true, } + // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent + start := r.clock.Now() w, err := r.listerWatcher.Watch(options) if err != nil { // If this is "connection refused" error, it means that most likely apiserver is not responsive. @@ -409,7 +411,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { return err } - if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { + if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { case isExpiredError(err): @@ -436,8 +438,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { - start := r.clock.Now() +func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 59b66256d85..3e2fcc19bf2 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -136,7 +136,7 @@ func TestReflectorWatchHandlerError(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -156,7 +156,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) { var resumeRV string stopWatch := make(chan struct{}, 1) stopWatch <- struct{}{} - err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch) + err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) } diff --git a/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go b/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go index 47ae9df4afd..edf28d164d0 100644 --- a/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go +++ b/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go @@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { default: msg := "Watch failed: %v" - if net.IsProbableEOF(err) { + if net.IsProbableEOF(err) || net.IsTimeout(err) { klog.V(5).Infof(msg, err) // Retry return false, 0 diff --git a/test/cmd/request-timeout.sh b/test/cmd/request-timeout.sh index de5838e7511..84dabb3552d 100755 --- a/test/cmd/request-timeout.sh +++ b/test/cmd/request-timeout.sh @@ -38,7 +38,7 @@ run_kubectl_request_timeout_tests() { kube::test::if_has_string "${output_message}" 'valid-pod' ## check --request-timeout on 'get pod' with --watch - output_message=$(kubectl get pod valid-pod --request-timeout=1 --watch 2>&1) + output_message=$(kubectl get pod valid-pod --request-timeout=1 --watch --v=5 2>&1) kube::test::if_has_string "${output_message}" 'Timeout exceeded while reading body' ## check --request-timeout value with no time unit diff --git a/test/integration/apimachinery/BUILD b/test/integration/apimachinery/BUILD index 33318cc0765..d6dd4064e67 100644 --- a/test/integration/apimachinery/BUILD +++ b/test/integration/apimachinery/BUILD @@ -5,6 +5,7 @@ go_test( srcs = [ "main_test.go", "watch_restart_test.go", + "watch_timeout_test.go", ], tags = [ "etcd", @@ -22,6 +23,7 @@ go_test( "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/watch:go_default_library", + "//staging/src/k8s.io/kubectl/pkg/proxy:go_default_library", "//test/integration/framework:go_default_library", ], ) diff --git a/test/integration/apimachinery/watch_timeout_test.go b/test/integration/apimachinery/watch_timeout_test.go new file mode 100644 index 00000000000..cabd37cc6dc --- /dev/null +++ b/test/integration/apimachinery/watch_timeout_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "context" + "net/http/httptest" + "net/http/httputil" + "net/url" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + kubectlproxy "k8s.io/kubectl/pkg/proxy" + "k8s.io/kubernetes/test/integration/framework" +) + +func TestWatchClientTimeout(t *testing.T) { + masterConfig := framework.NewIntegrationTestMasterConfig() + _, s, closeFn := framework.RunAMaster(masterConfig) + defer closeFn() + + t.Run("direct", func(t *testing.T) { + t.Logf("client at %s", s.URL) + testWatchClientTimeouts(t, s.URL) + }) + + t.Run("reverse proxy", func(t *testing.T) { + u, _ := url.Parse(s.URL) + proxy := httputil.NewSingleHostReverseProxy(u) + proxy.FlushInterval = -1 + proxyServer := httptest.NewServer(httputil.NewSingleHostReverseProxy(u)) + defer proxyServer.Close() + + t.Logf("client to %s, backend at %s", proxyServer.URL, s.URL) + testWatchClientTimeouts(t, proxyServer.URL) + }) + + t.Run("kubectl proxy", func(t *testing.T) { + kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, &restclient.Config{Host: s.URL, Timeout: 2 * time.Second}, 0) + if err != nil { + t.Fatal(err) + } + kubectlProxyListener, err := kubectlProxyServer.Listen("", 0) + if err != nil { + t.Fatal(err) + } + defer kubectlProxyListener.Close() + go kubectlProxyServer.ServeOnListener(kubectlProxyListener) + + t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), s.URL) + testWatchClientTimeouts(t, "http://"+kubectlProxyListener.Addr().String()) + }) +} + +func testWatchClientTimeouts(t *testing.T, url string) { + t.Run("timeout", func(t *testing.T) { + testWatchClientTimeout(t, url, time.Second, 0) + }) + t.Run("timeoutSeconds", func(t *testing.T) { + testWatchClientTimeout(t, url, 0, time.Second) + }) + t.Run("timeout+timeoutSeconds", func(t *testing.T) { + testWatchClientTimeout(t, url, time.Second, time.Second) + }) +} + +func testWatchClientTimeout(t *testing.T, serverURL string, timeout, timeoutSeconds time.Duration) { + // client + client, err := kubernetes.NewForConfig(&restclient.Config{Host: serverURL, Timeout: timeout}) + if err != nil { + t.Fatal(err) + } + + listCount := 0 + watchCount := 0 + stopCh := make(chan struct{}) + listWatch := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + t.Logf("listing (version=%s continue=%s)", options.ResourceVersion, options.Continue) + listCount++ + if listCount > 1 { + t.Errorf("listed more than once") + close(stopCh) + } + return client.CoreV1().ConfigMaps(metav1.NamespaceAll).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + t.Logf("watching (version=%s)", options.ResourceVersion) + if timeoutSeconds != 0 { + timeout := int64(timeoutSeconds / time.Second) + options.TimeoutSeconds = &timeout + } + watchCount++ + if watchCount > 1 { + // success, restarted watch + close(stopCh) + } + return client.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), options) + }, + } + _, informer := cache.NewIndexerInformer(listWatch, &corev1.ConfigMap{}, 30*time.Minute, cache.ResourceEventHandlerFuncs{}, cache.Indexers{}) + informer.Run(stopCh) + select { + case <-stopCh: + case <-time.After(time.Minute): + t.Fatal("timeout") + } +}