Merge pull request #67357 from yujuhong/add-ctx-apiserver

Automatic merge from submit-queue (batch tested with PRs 67137, 67372, 67505, 67373, 67357). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

apiserver: pass the parent request context when creating InputStream

**What this PR does / why we need it**:
This ensures that request cancellation will be propagated properly to
the client used to create the stream. Without this fix, the apiserver
and the kubelet may leak resources (e.g., goroutine, inotify watches).
One such example is that if user run `kubectl logs -f <container that
don't produce new logs)` and then enter ctrl-c, both kubelet and
apiserver will hold on to the connection and resources indefinitely.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #64315

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-08-16 10:34:26 -07:00 committed by GitHub
commit b8efc41806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 17 additions and 10 deletions

View File

@ -412,7 +412,7 @@ func (obj *SimpleStream) DeepCopyObject() runtime.Object {
panic("SimpleStream does not support DeepCopy") panic("SimpleStream does not support DeepCopy")
} }
func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, bool, string, error) { func (s *SimpleStream) InputStream(_ context.Context, version, accept string) (io.ReadCloser, bool, string, error) {
s.version = version s.version = version
s.accept = accept s.accept = accept
return s, false, s.contentType, s.err return s, false, s.contentType, s.err

View File

@ -56,7 +56,7 @@ func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSer
// If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many // If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many
// browser clients cannot easily handle binary streaming protocols). // browser clients cannot easily handle binary streaming protocols).
func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, stream rest.ResourceStreamer, w http.ResponseWriter, req *http.Request) { func StreamObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, stream rest.ResourceStreamer, w http.ResponseWriter, req *http.Request) {
out, flush, contentType, err := stream.InputStream(gv.String(), req.Header.Get("Accept")) out, flush, contentType, err := stream.InputStream(req.Context(), gv.String(), req.Header.Get("Accept"))
if err != nil { if err != nil {
ErrorNegotiated(err, s, gv, w, req) ErrorNegotiated(err, s, gv, w, req)
return return

View File

@ -17,6 +17,7 @@ limitations under the License.
package rest package rest
import ( import (
"context"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
@ -49,7 +50,7 @@ func (obj *LocationStreamer) DeepCopyObject() runtime.Object {
// InputStream returns a stream with the contents of the URL location. If no location is provided, // InputStream returns a stream with the contents of the URL location. If no location is provided,
// a null stream is returned. // a null stream is returned.
func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) { func (s *LocationStreamer) InputStream(ctx context.Context, apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, contentType string, err error) {
if s.Location == nil { if s.Location == nil {
// If no location was provided, return a null stream // If no location was provided, return a null stream
return nil, false, "", nil return nil, false, "", nil
@ -59,7 +60,12 @@ func (s *LocationStreamer) InputStream(apiVersion, acceptHeader string) (stream
transport = http.DefaultTransport transport = http.DefaultTransport
} }
client := &http.Client{Transport: transport} client := &http.Client{Transport: transport}
resp, err := client.Get(s.Location.String()) req, err := http.NewRequest("GET", s.Location.String(), nil)
// Pass the parent context down to the request to ensure that the resources
// will be release properly.
req = req.WithContext(ctx)
resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, false, "", err return nil, false, "", err
} }

View File

@ -19,6 +19,7 @@ package rest
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -45,7 +46,7 @@ func TestInputStreamReader(t *testing.T) {
streamer := &LocationStreamer{ streamer := &LocationStreamer{
Location: u, Location: u,
} }
readCloser, _, _, err := streamer.InputStream("", "") readCloser, _, _, err := streamer.InputStream(context.Background(), "", "")
if err != nil { if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err) t.Errorf("Unexpected error when getting stream: %v", err)
return return
@ -61,7 +62,7 @@ func TestInputStreamNullLocation(t *testing.T) {
streamer := &LocationStreamer{ streamer := &LocationStreamer{
Location: nil, Location: nil,
} }
readCloser, _, _, err := streamer.InputStream("", "") readCloser, _, _, err := streamer.InputStream(context.Background(), "", "")
if err != nil { if err != nil {
t.Errorf("Unexpected error when getting stream with null location: %v", err) t.Errorf("Unexpected error when getting stream with null location: %v", err)
} }
@ -91,7 +92,7 @@ func TestInputStreamContentType(t *testing.T) {
Location: location, Location: location,
Transport: fakeTransport("application/json", "hello world"), Transport: fakeTransport("application/json", "hello world"),
} }
readCloser, _, contentType, err := streamer.InputStream("", "") readCloser, _, contentType, err := streamer.InputStream(context.Background(), "", "")
if err != nil { if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err) t.Errorf("Unexpected error when getting stream: %v", err)
return return
@ -109,7 +110,7 @@ func TestInputStreamTransport(t *testing.T) {
Location: location, Location: location,
Transport: fakeTransport("text/plain", message), Transport: fakeTransport("text/plain", message),
} }
readCloser, _, _, err := streamer.InputStream("", "") readCloser, _, _, err := streamer.InputStream(context.Background(), "", "")
if err != nil { if err != nil {
t.Errorf("Unexpected error when getting stream: %v", err) t.Errorf("Unexpected error when getting stream: %v", err)
return return
@ -136,7 +137,7 @@ func TestInputStreamInternalServerErrorTransport(t *testing.T) {
} }
expectedError := errors.NewInternalError(fmt.Errorf("%s", message)) expectedError := errors.NewInternalError(fmt.Errorf("%s", message))
_, _, _, err := streamer.InputStream("", "") _, _, _, err := streamer.InputStream(context.Background(), "", "")
if err == nil { if err == nil {
t.Errorf("unexpected non-error") t.Errorf("unexpected non-error")
return return

View File

@ -320,7 +320,7 @@ type ResourceStreamer interface {
// the caller may return a flag indicating whether the result should be flushed as writes occur // the caller may return a flag indicating whether the result should be flushed as writes occur
// and a content type string that indicates the type of the stream. // and a content type string that indicates the type of the stream.
// If a null stream is returned, a StatusNoContent response wil be generated. // If a null stream is returned, a StatusNoContent response wil be generated.
InputStream(apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error) InputStream(ctx context.Context, apiVersion, acceptHeader string) (stream io.ReadCloser, flush bool, mimeType string, err error)
} }
// StorageMetadata is an optional interface that callers can implement to provide additional // StorageMetadata is an optional interface that callers can implement to provide additional