From 31d1607a514b62ef46452e402f5438d827314b98 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Mon, 13 Aug 2018 16:34:49 -0700 Subject: [PATCH 1/2] apiserver: pass the parent request context when creating InputStream This ensures that request cancellation will be propagated properly to the client used to create the stream. Without this fix, the apiserver and the kubelet may leak resources (e.g., goroutine, inotify watches). One such example is that if user run `kubectl logs -f Date: Mon, 13 Aug 2018 17:35:00 -0700 Subject: [PATCH 2/2] Update the InputStream tests --- .../k8s.io/apiserver/pkg/endpoints/apiserver_test.go | 2 +- .../pkg/registry/generic/rest/streamer_test.go | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index ddd74e7d78c..69dc93c2614 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -412,7 +412,7 @@ func (obj *SimpleStream) DeepCopyObject() runtime.Object { panic("SimpleStream does not support DeepCopy") } -func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, string, error) { +func (s *SimpleStream) InputStream(_ context.Context, version, accept string) (io.ReadCloser, bool, string, error) { s.version = version s.accept = accept return s, false, s.contentType, s.err diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer_test.go index 4372c62cc68..11714cc1566 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer_test.go @@ -19,6 +19,7 @@ package rest import ( "bufio" "bytes" + "context" "fmt" "io/ioutil" "net/http" @@ -45,7 +46,7 @@ func TestInputStreamReader(t *testing.T) { streamer := &LocationStreamer{ Location: u, } - readCloser, _, _, err := streamer.InputStream("", "") + readCloser, _, _, err := streamer.InputStream(context.Background(), "", "") if err != nil { t.Errorf("Unexpected error when getting stream: %v", err) return @@ -61,7 +62,7 @@ func TestInputStreamNullLocation(t *testing.T) { streamer := &LocationStreamer{ Location: nil, } - readCloser, _, _, err := streamer.InputStream("", "") + readCloser, _, _, err := streamer.InputStream(context.Background(), "", "") if err != nil { t.Errorf("Unexpected error when getting stream with null location: %v", err) } @@ -91,7 +92,7 @@ func TestInputStreamContentType(t *testing.T) { Location: location, Transport: fakeTransport("application/json", "hello world"), } - readCloser, _, contentType, err := streamer.InputStream("", "") + readCloser, _, contentType, err := streamer.InputStream(context.Background(), "", "") if err != nil { t.Errorf("Unexpected error when getting stream: %v", err) return @@ -109,7 +110,7 @@ func TestInputStreamTransport(t *testing.T) { Location: location, Transport: fakeTransport("text/plain", message), } - readCloser, _, _, err := streamer.InputStream("", "") + readCloser, _, _, err := streamer.InputStream(context.Background(), "", "") if err != nil { t.Errorf("Unexpected error when getting stream: %v", err) return @@ -136,7 +137,7 @@ func TestInputStreamInternalServerErrorTransport(t *testing.T) { } expectedError := errors.NewInternalError(fmt.Errorf("%s", message)) - _, _, _, err := streamer.InputStream("", "") + _, _, _, err := streamer.InputStream(context.Background(), "", "") if err == nil { t.Errorf("unexpected non-error") return