diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index 0add873d9d3..c3d3728c568 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -56,7 +56,7 @@ func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSer // If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many // browser clients cannot easily handle binary streaming protocols). func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, stream rest.ResourceStreamer, w http.ResponseWriter, req *http.Request) { - out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept")) + out, flush, contentType, err := stream.InputStream(req.Context(), gv.String(), req.Header.Get("Accept")) if err != nil { ErrorNegotiated(err, s, gv, w, req) return diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer.go index d84a01ab6c5..acab7652e1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/rest/streamer.go @@ -17,6 +17,7 @@ limitations under the License. package rest import ( + "context" "io" "net/http" "net/url" @@ -49,7 +50,7 @@ func (obj *LocationStreamer) DeepCopyObject() runtime.Object { // InputStream returns a stream with the contents of the URL location. If no location is provided, // a null stream is returned. -func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) { +func (s *LocationStreamer) InputStream(ctx context.Context, apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) { if s.Location == nil { // If no location was provided, return a null stream return nil, false, "", nil @@ -59,7 +60,12 @@ func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream transport = http.DefaultTransport } client := &http.Client{Transport: transport} - resp, err := client.Get(s.Location.String()) + req, err := http.NewRequest("GET", s.Location.String(), nil) + // Pass the parent context down to the request to ensure that the resources + // will be release properly. + req = req.WithContext(ctx) + + resp, err := client.Do(req) if err != nil { return nil, false, "", err } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go index 0c67ce3e35f..572f924e73b 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go @@ -320,7 +320,7 @@ type ResourceStreamer interface { // the caller may return a flag indicating whether the result should be flushed as writes occur // and a content type string that indicates the type of the stream. // If a null stream is returned, a StatusNoContent response wil be generated. - InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error) + InputStream(ctx context.Context, apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error) } // StorageMetadata is an optional interface that callers can implement to provide additional