mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	Merge pull request #89652 from liggitt/relist-timeout
Fix client watch reestablishment handling of client-side timeouts
This commit is contained in:
		@@ -55,6 +55,12 @@ func JoinPreservingTrailingSlash(elem ...string) string {
 | 
				
			|||||||
	return result
 | 
						return result
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsTimeout returns true if the given error is a network timeout error
 | 
				
			||||||
 | 
					func IsTimeout(err error) bool {
 | 
				
			||||||
 | 
						neterr, ok := err.(net.Error)
 | 
				
			||||||
 | 
						return ok && neterr != nil && neterr.Timeout()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// IsProbableEOF returns true if the given error resembles a connection termination
 | 
					// IsProbableEOF returns true if the given error resembles a connection termination
 | 
				
			||||||
// scenario that would justify assuming that the watch is empty.
 | 
					// scenario that would justify assuming that the watch is empty.
 | 
				
			||||||
// These errors are what the Go http stack returns back to us which are general
 | 
					// These errors are what the Go http stack returns back to us which are general
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -113,7 +113,7 @@ func (sw *StreamWatcher) receive() {
 | 
				
			|||||||
			case io.ErrUnexpectedEOF:
 | 
								case io.ErrUnexpectedEOF:
 | 
				
			||||||
				klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
 | 
									klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
				if net.IsProbableEOF(err) {
 | 
									if net.IsProbableEOF(err) || net.IsTimeout(err) {
 | 
				
			||||||
					klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
 | 
										klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
 | 
				
			||||||
				} else {
 | 
									} else {
 | 
				
			||||||
					sw.result <- Event{
 | 
										sw.result <- Event{
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -669,7 +669,7 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
 | 
				
			|||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		// The watch stream mechanism handles many common partial data errors, so closed
 | 
							// The watch stream mechanism handles many common partial data errors, so closed
 | 
				
			||||||
		// connections can be retried in many cases.
 | 
							// connections can be retried in many cases.
 | 
				
			||||||
		if net.IsProbableEOF(err) {
 | 
							if net.IsProbableEOF(err) || net.IsTimeout(err) {
 | 
				
			||||||
			return watch.NewEmptyWatch(), nil
 | 
								return watch.NewEmptyWatch(), nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -396,6 +396,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
 | 
				
			|||||||
			AllowWatchBookmarks: true,
 | 
								AllowWatchBookmarks: true,
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
 | 
				
			||||||
 | 
							start := r.clock.Now()
 | 
				
			||||||
		w, err := r.listerWatcher.Watch(options)
 | 
							w, err := r.listerWatcher.Watch(options)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
 | 
								// If this is "connection refused" error, it means that most likely apiserver is not responsive.
 | 
				
			||||||
@@ -409,7 +411,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
 | 
				
			|||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
 | 
							if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
 | 
				
			||||||
			if err != errorStopRequested {
 | 
								if err != errorStopRequested {
 | 
				
			||||||
				switch {
 | 
									switch {
 | 
				
			||||||
				case isExpiredError(err):
 | 
									case isExpiredError(err):
 | 
				
			||||||
@@ -436,8 +438,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// watchHandler watches w and keeps *resourceVersion up to date.
 | 
					// watchHandler watches w and keeps *resourceVersion up to date.
 | 
				
			||||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
 | 
					func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
 | 
				
			||||||
	start := r.clock.Now()
 | 
					 | 
				
			||||||
	eventCount := 0
 | 
						eventCount := 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Stopping the watcher should be idempotent and if we return from this function there's no way
 | 
						// Stopping the watcher should be idempotent and if we return from this function there's no way
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -136,7 +136,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
 | 
				
			|||||||
		fw.Stop()
 | 
							fw.Stop()
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	var resumeRV string
 | 
						var resumeRV string
 | 
				
			||||||
	err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
 | 
						err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
 | 
				
			||||||
	if err == nil {
 | 
						if err == nil {
 | 
				
			||||||
		t.Errorf("unexpected non-error")
 | 
							t.Errorf("unexpected non-error")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -156,7 +156,7 @@ func TestReflectorWatchHandler(t *testing.T) {
 | 
				
			|||||||
		fw.Stop()
 | 
							fw.Stop()
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	var resumeRV string
 | 
						var resumeRV string
 | 
				
			||||||
	err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
 | 
						err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, wait.NeverStop)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Errorf("unexpected error %v", err)
 | 
							t.Errorf("unexpected error %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
 | 
				
			|||||||
	var resumeRV string
 | 
						var resumeRV string
 | 
				
			||||||
	stopWatch := make(chan struct{}, 1)
 | 
						stopWatch := make(chan struct{}, 1)
 | 
				
			||||||
	stopWatch <- struct{}{}
 | 
						stopWatch <- struct{}{}
 | 
				
			||||||
	err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
 | 
						err := g.watchHandler(time.Now(), fw, &resumeRV, nevererrc, stopWatch)
 | 
				
			||||||
	if err != errorStopRequested {
 | 
						if err != errorStopRequested {
 | 
				
			||||||
		t.Errorf("expected stop error, got %q", err)
 | 
							t.Errorf("expected stop error, got %q", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -120,7 +120,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	default:
 | 
						default:
 | 
				
			||||||
		msg := "Watch failed: %v"
 | 
							msg := "Watch failed: %v"
 | 
				
			||||||
		if net.IsProbableEOF(err) {
 | 
							if net.IsProbableEOF(err) || net.IsTimeout(err) {
 | 
				
			||||||
			klog.V(5).Infof(msg, err)
 | 
								klog.V(5).Infof(msg, err)
 | 
				
			||||||
			// Retry
 | 
								// Retry
 | 
				
			||||||
			return false, 0
 | 
								return false, 0
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -38,7 +38,7 @@ run_kubectl_request_timeout_tests() {
 | 
				
			|||||||
  kube::test::if_has_string "${output_message}" 'valid-pod'
 | 
					  kube::test::if_has_string "${output_message}" 'valid-pod'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  ## check --request-timeout on 'get pod' with --watch
 | 
					  ## check --request-timeout on 'get pod' with --watch
 | 
				
			||||||
  output_message=$(kubectl get pod valid-pod --request-timeout=1 --watch 2>&1)
 | 
					  output_message=$(kubectl get pod valid-pod --request-timeout=1 --watch --v=5 2>&1)
 | 
				
			||||||
  kube::test::if_has_string "${output_message}" 'Timeout exceeded while reading body'
 | 
					  kube::test::if_has_string "${output_message}" 'Timeout exceeded while reading body'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  ## check --request-timeout value with no time unit
 | 
					  ## check --request-timeout value with no time unit
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -5,6 +5,7 @@ go_test(
 | 
				
			|||||||
    srcs = [
 | 
					    srcs = [
 | 
				
			||||||
        "main_test.go",
 | 
					        "main_test.go",
 | 
				
			||||||
        "watch_restart_test.go",
 | 
					        "watch_restart_test.go",
 | 
				
			||||||
 | 
					        "watch_timeout_test.go",
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
    tags = [
 | 
					    tags = [
 | 
				
			||||||
        "etcd",
 | 
					        "etcd",
 | 
				
			||||||
@@ -22,6 +23,7 @@ go_test(
 | 
				
			|||||||
        "//staging/src/k8s.io/client-go/rest:go_default_library",
 | 
					        "//staging/src/k8s.io/client-go/rest:go_default_library",
 | 
				
			||||||
        "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
 | 
					        "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
 | 
				
			||||||
        "//staging/src/k8s.io/client-go/tools/watch:go_default_library",
 | 
					        "//staging/src/k8s.io/client-go/tools/watch:go_default_library",
 | 
				
			||||||
 | 
					        "//staging/src/k8s.io/kubectl/pkg/proxy:go_default_library",
 | 
				
			||||||
        "//test/integration/framework:go_default_library",
 | 
					        "//test/integration/framework:go_default_library",
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										129
									
								
								test/integration/apimachinery/watch_timeout_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										129
									
								
								test/integration/apimachinery/watch_timeout_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,129 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2020 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package apimachinery
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"net/http/httptest"
 | 
				
			||||||
 | 
						"net/http/httputil"
 | 
				
			||||||
 | 
						"net/url"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						corev1 "k8s.io/api/core/v1"
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/watch"
 | 
				
			||||||
 | 
						"k8s.io/client-go/kubernetes"
 | 
				
			||||||
 | 
						restclient "k8s.io/client-go/rest"
 | 
				
			||||||
 | 
						"k8s.io/client-go/tools/cache"
 | 
				
			||||||
 | 
						kubectlproxy "k8s.io/kubectl/pkg/proxy"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/test/integration/framework"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestWatchClientTimeout(t *testing.T) {
 | 
				
			||||||
 | 
						masterConfig := framework.NewIntegrationTestMasterConfig()
 | 
				
			||||||
 | 
						_, s, closeFn := framework.RunAMaster(masterConfig)
 | 
				
			||||||
 | 
						defer closeFn()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Run("direct", func(t *testing.T) {
 | 
				
			||||||
 | 
							t.Logf("client at %s", s.URL)
 | 
				
			||||||
 | 
							testWatchClientTimeouts(t, s.URL)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Run("reverse proxy", func(t *testing.T) {
 | 
				
			||||||
 | 
							u, _ := url.Parse(s.URL)
 | 
				
			||||||
 | 
							proxy := httputil.NewSingleHostReverseProxy(u)
 | 
				
			||||||
 | 
							proxy.FlushInterval = -1
 | 
				
			||||||
 | 
							proxyServer := httptest.NewServer(httputil.NewSingleHostReverseProxy(u))
 | 
				
			||||||
 | 
							defer proxyServer.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							t.Logf("client to %s, backend at %s", proxyServer.URL, s.URL)
 | 
				
			||||||
 | 
							testWatchClientTimeouts(t, proxyServer.URL)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						t.Run("kubectl proxy", func(t *testing.T) {
 | 
				
			||||||
 | 
							kubectlProxyServer, err := kubectlproxy.NewServer("", "/", "/static/", nil, &restclient.Config{Host: s.URL, Timeout: 2 * time.Second}, 0)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatal(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							kubectlProxyListener, err := kubectlProxyServer.Listen("", 0)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatal(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							defer kubectlProxyListener.Close()
 | 
				
			||||||
 | 
							go kubectlProxyServer.ServeOnListener(kubectlProxyListener)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							t.Logf("client to %s, backend at %s", kubectlProxyListener.Addr().String(), s.URL)
 | 
				
			||||||
 | 
							testWatchClientTimeouts(t, "http://"+kubectlProxyListener.Addr().String())
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func testWatchClientTimeouts(t *testing.T, url string) {
 | 
				
			||||||
 | 
						t.Run("timeout", func(t *testing.T) {
 | 
				
			||||||
 | 
							testWatchClientTimeout(t, url, time.Second, 0)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						t.Run("timeoutSeconds", func(t *testing.T) {
 | 
				
			||||||
 | 
							testWatchClientTimeout(t, url, 0, time.Second)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						t.Run("timeout+timeoutSeconds", func(t *testing.T) {
 | 
				
			||||||
 | 
							testWatchClientTimeout(t, url, time.Second, time.Second)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func testWatchClientTimeout(t *testing.T, serverURL string, timeout, timeoutSeconds time.Duration) {
 | 
				
			||||||
 | 
						// client
 | 
				
			||||||
 | 
						client, err := kubernetes.NewForConfig(&restclient.Config{Host: serverURL, Timeout: timeout})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatal(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						listCount := 0
 | 
				
			||||||
 | 
						watchCount := 0
 | 
				
			||||||
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
 | 
						listWatch := &cache.ListWatch{
 | 
				
			||||||
 | 
							ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 | 
				
			||||||
 | 
								t.Logf("listing (version=%s continue=%s)", options.ResourceVersion, options.Continue)
 | 
				
			||||||
 | 
								listCount++
 | 
				
			||||||
 | 
								if listCount > 1 {
 | 
				
			||||||
 | 
									t.Errorf("listed more than once")
 | 
				
			||||||
 | 
									close(stopCh)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return client.CoreV1().ConfigMaps(metav1.NamespaceAll).List(context.TODO(), options)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
				
			||||||
 | 
								t.Logf("watching (version=%s)", options.ResourceVersion)
 | 
				
			||||||
 | 
								if timeoutSeconds != 0 {
 | 
				
			||||||
 | 
									timeout := int64(timeoutSeconds / time.Second)
 | 
				
			||||||
 | 
									options.TimeoutSeconds = &timeout
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								watchCount++
 | 
				
			||||||
 | 
								if watchCount > 1 {
 | 
				
			||||||
 | 
									// success, restarted watch
 | 
				
			||||||
 | 
									close(stopCh)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return client.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), options)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						_, informer := cache.NewIndexerInformer(listWatch, &corev1.ConfigMap{}, 30*time.Minute, cache.ResourceEventHandlerFuncs{}, cache.Indexers{})
 | 
				
			||||||
 | 
						informer.Run(stopCh)
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-stopCh:
 | 
				
			||||||
 | 
						case <-time.After(time.Minute):
 | 
				
			||||||
 | 
							t.Fatal("timeout")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user