diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index ddd74e7d78c..69dc93c2614 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -412,7 +412,7 @@ func (obj *SimpleStream) DeepCopyObject() runtime.Object { panic("SimpleStream does not support DeepCopy") } -func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, string, error) { +func (s *SimpleStream) InputStream(_ context.Context, version, accept string) (io.ReadCloser, bool, string, error) { s.version = version s.accept = accept return s, false, s.contentType, s.err diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index 0add873d9d3..c3d3728c568 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -56,7 +56,7 @@ func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSer // If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many // browser clients cannot easily handle binary streaming protocols). func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, stream rest.ResourceStreamer, w http.ResponseWriter, req *http.Request) { - out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept")) + out, flush, contentType, err := stream.InputStream(req.Context(), gv.String(), req.Header.Get("Accept")) if err != nil { ErrorNegotiated(err, s, gv, w, req) return diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer.go index d84a01ab6c5..acab7652e1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer.go @@ -17,6 +17,7 @@ limitations under the License. package rest import ( + "context" "io" "net/http" "net/url" @@ -49,7 +50,7 @@ func (obj *LocationStreamer) DeepCopyObject() runtime.Object { // InputStream returns a stream with the contents of the URL location. If no location is provided, // a null stream is returned. -func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) { +func (s *LocationStreamer) InputStream(ctx context.Context, apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) { if s.Location == nil { // If no location was provided, return a null stream return nil, false, "", nil @@ -59,7 +60,12 @@ func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream transport = http.DefaultTransport } client := &http.Client{Transport: transport} - resp, err := client.Get(s.Location.String()) + req, err := http.NewRequest("GET", s.Location.String(), nil) + // Pass the parent context down to the request to ensure that the resources + // will be release properly. + req = req.WithContext(ctx) + + resp, err := client.Do(req) if err != nil { return nil, false, "", err } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer_test.go index 4372c62cc68..11714cc1566 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer_test.go @@ -19,6 +19,7 @@ package rest import ( "bufio" "bytes" + "context" "fmt" "io/ioutil" "net/http" @@ -45,7 +46,7 @@ func TestInputStreamReader(t *testing.T) { streamer := &LocationStreamer{ Location: u, } - readCloser, _, _, err := streamer.InputStream("", "") + readCloser, _, _, err := streamer.InputStream(context.Background(), "", "") if err != nil { t.Errorf("Unexpected error when getting stream: %v", err) return @@ -61,7 +62,7 @@ func TestInputStreamNullLocation(t *testing.T) { streamer := &LocationStreamer{ Location: nil, } - readCloser, _, _, err := streamer.InputStream("", "") + readCloser, _, _, err := streamer.InputStream(context.Background(), "", "") if err != nil { t.Errorf("Unexpected error when getting stream with null location: %v", err) } @@ -91,7 +92,7 @@ func TestInputStreamContentType(t *testing.T) { Location: location, Transport: fakeTransport("application/json", "hello world"), } - readCloser, _, contentType, err := streamer.InputStream("", "") + readCloser, _, contentType, err := streamer.InputStream(context.Background(), "", "") if err != nil { t.Errorf("Unexpected error when getting stream: %v", err) return @@ -109,7 +110,7 @@ func TestInputStreamTransport(t *testing.T) { Location: location, Transport: fakeTransport("text/plain", message), } - readCloser, _, _, err := streamer.InputStream("", "") + readCloser, _, _, err := streamer.InputStream(context.Background(), "", "") if err != nil { t.Errorf("Unexpected error when getting stream: %v", err) return @@ -136,7 +137,7 @@ func TestInputStreamInternalServerErrorTransport(t *testing.T) { } expectedError := errors.NewInternalError(fmt.Errorf("%s", message)) - _, _, _, err := streamer.InputStream("", "") + _, _, _, err := streamer.InputStream(context.Background(), "", "") if err == nil { t.Errorf("unexpected non-error") return diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index 0c67ce3e35f..572f924e73b 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -320,7 +320,7 @@ type ResourceStreamer interface { // the caller may return a flag indicating whether the result should be flushed as writes occur // and a content type string that indicates the type of the stream. // If a null stream is returned, a StatusNoContent response wil be generated. - InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error) + InputStream(ctx context.Context, apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error) } // StorageMetadata is an optional interface that callers can implement to provide additional