apiserver: pass the parent request context when creating InputStream

This ensures that request cancellation will be propagated properly to
the client used to create the stream. Without this fix, the apiserver
and the kubelet may leak resources (e.g., goroutine, inotify watches).
One such example is that if user run `kubectl logs -f <container that
don't produce new logs)` and then enter ctrl-c, both kubelet and
apiserver will hold on to the connection and resources indefinitely.
This commit is contained in:
Yu-Ju Hong 2018-08-13 16:34:49 -07:00
parent fbb2dfcc6a
commit 31d1607a51
3 changed files with 10 additions and 4 deletions

View File

@ -56,7 +56,7 @@ func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSer
// If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many
// browser clients cannot easily handle binary streaming protocols).
func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, stream rest.ResourceStreamer, w http.ResponseWriter, req *http.Request) {
out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept"))
out, flush, contentType, err := stream.InputStream(req.Context(), gv.String(), req.Header.Get("Accept"))
if err != nil {
ErrorNegotiated(err, s, gv, w, req)
return

View File

@ -17,6 +17,7 @@ limitations under the License.
package rest
import (
"context"
"io"
"net/http"
"net/url"
@ -49,7 +50,7 @@ func (obj *LocationStreamer) DeepCopyObject() runtime.Object {
// InputStream returns a stream with the contents of the URL location. If no location is provided,
// a null stream is returned.
func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) {
func (s *LocationStreamer) InputStream(ctx context.Context, apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) {
if s.Location == nil {
// If no location was provided, return a null stream
return nil, false, "", nil
@ -59,7 +60,12 @@ func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream
transport = http.DefaultTransport
}
client := &http.Client{Transport: transport}
resp, err := client.Get(s.Location.String())
req, err := http.NewRequest("GET", s.Location.String(), nil)
// Pass the parent context down to the request to ensure that the resources
// will be release properly.
req = req.WithContext(ctx)
resp, err := client.Do(req)
if err != nil {
return nil, false, "", err
}

View File

@ -320,7 +320,7 @@ type ResourceStreamer interface {
// the caller may return a flag indicating whether the result should be flushed as writes occur
// and a content type string that indicates the type of the stream.
// If a null stream is returned, a StatusNoContent response wil be generated.
InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error)
InputStream(ctx context.Context, apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error)
}
// StorageMetadata is an optional interface that callers can implement to provide additional