Merge pull request #104920 from tkashem/response-writer-cleanup

apiserver: decorate http.ResponseWriter correctly
This commit is contained in:
Kubernetes Prow Robot 2021-10-05 00:53:09 -07:00 committed by GitHub
commit efa9029a0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 722 additions and 191 deletions

View File

@ -31,7 +31,7 @@ import (
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/apiserver/pkg/util/wsstream"
)
@ -113,7 +113,7 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar
},
})
conn.SetIdleTimeout(idleTimeout)
_, streams, err := conn.Open(httplog.Unlogged(req, w), req)
_, streams, err := conn.Open(responsewriter.GetOriginal(w), req)
if err != nil {
err = fmt.Errorf("unable to upgrade websocket connection: %v", err)
return err

View File

@ -22,7 +22,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/apiserver/pkg/util/wsstream"
)
@ -95,7 +95,7 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Opti
},
})
conn.SetIdleTimeout(idleTimeout)
negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(req, w), req)
negotiatedProtocol, streams, err := conn.Open(responsewriter.GetOriginal(w), req)
if err != nil {
runtime.HandleError(fmt.Errorf("unable to upgrade websocket connection: %v", err))
return nil, false

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
)
// WithAudit decorates a http.Handler with audit logging information for all the
@ -178,19 +179,11 @@ func decorateResponseWriter(ctx context.Context, responseWriter http.ResponseWri
omitStages: omitStages,
}
// check if the ResponseWriter we're wrapping is the fancy one we need
// or if the basic is sufficient
//lint:file-ignore SA1019 Keep supporting deprecated http.CloseNotifier
_, cn := responseWriter.(http.CloseNotifier)
_, fl := responseWriter.(http.Flusher)
_, hj := responseWriter.(http.Hijacker)
if cn && fl && hj {
return &fancyResponseWriterDelegator{delegate}
}
return delegate
return responsewriter.WrapForHTTP1Or2(delegate)
}
var _ http.ResponseWriter = &auditResponseWriter{}
var _ responsewriter.UserProvidedDecorator = &auditResponseWriter{}
// auditResponseWriter intercepts WriteHeader, sets it in the event. If the sink is set, it will
// create immediately an event (for long running requests).
@ -203,6 +196,10 @@ type auditResponseWriter struct {
omitStages []auditinternal.Stage
}
func (a *auditResponseWriter) Unwrap() http.ResponseWriter {
return a.ResponseWriter
}
func (a *auditResponseWriter) processCode(code int) {
a.once.Do(func() {
if a.event.ResponseStatus == nil {
@ -228,28 +225,11 @@ func (a *auditResponseWriter) WriteHeader(code int) {
a.ResponseWriter.WriteHeader(code)
}
// fancyResponseWriterDelegator implements http.CloseNotifier, http.Flusher and
// http.Hijacker which are needed to make certain http operation (e.g. watch, rsh, etc)
// working.
type fancyResponseWriterDelegator struct {
*auditResponseWriter
}
func (f *fancyResponseWriterDelegator) CloseNotify() <-chan bool {
return f.ResponseWriter.(http.CloseNotifier).CloseNotify()
}
func (f *fancyResponseWriterDelegator) Flush() {
f.ResponseWriter.(http.Flusher).Flush()
}
func (f *fancyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) {
func (a *auditResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
// fake a response status before protocol switch happens
f.processCode(http.StatusSwitchingProtocols)
a.processCode(http.StatusSwitchingProtocols)
return f.ResponseWriter.(http.Hijacker).Hijack()
// the outer ResponseWriter object returned by WrapForHTTP1Or2 implements
// http.Hijacker if the inner object (a.ResponseWriter) implements http.Hijacker.
return a.ResponseWriter.(http.Hijacker).Hijack()
}
var _ http.CloseNotifier = &fancyResponseWriterDelegator{}
var _ http.Flusher = &fancyResponseWriterDelegator{}
var _ http.Hijacker = &fancyResponseWriterDelegator{}

View File

@ -17,9 +17,7 @@ limitations under the License.
package filters
import (
"bufio"
"context"
"net"
"net/http"
"net/http/httptest"
"reflect"
@ -37,6 +35,7 @@ import (
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
)
type fakeAuditSink struct {
@ -75,43 +74,46 @@ func (s *fakeAuditSink) Pop(timeout time.Duration) (*auditinternal.Event, error)
return result, err
}
type simpleResponseWriter struct{}
var _ http.ResponseWriter = &simpleResponseWriter{}
func (*simpleResponseWriter) WriteHeader(code int) {}
func (*simpleResponseWriter) Write(bs []byte) (int, error) { return len(bs), nil }
func (*simpleResponseWriter) Header() http.Header { return http.Header{} }
type fancyResponseWriter struct {
simpleResponseWriter
}
func (*fancyResponseWriter) CloseNotify() <-chan bool { return nil }
func (*fancyResponseWriter) Flush() {}
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }
func TestConstructResponseWriter(t *testing.T) {
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, nil, nil, nil)
inner := &responsewriter.FakeResponseWriter{}
actual := decorateResponseWriter(context.Background(), inner, nil, nil, nil)
switch v := actual.(type) {
case *auditResponseWriter:
default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
}
if innerGot := actual.(responsewriter.UserProvidedDecorator).Unwrap(); inner != innerGot {
t.Errorf("Expected the decorator to return the inner http.ResponseWriter object")
}
actual = decorateResponseWriter(context.Background(), &fancyResponseWriter{}, nil, nil, nil)
switch v := actual.(type) {
case *fancyResponseWriterDelegator:
default:
t.Errorf("Expected fancyResponseWriterDelegator, got %v", reflect.TypeOf(v))
actual = decorateResponseWriter(context.Background(), &responsewriter.FakeResponseWriterFlusherCloseNotifier{}, nil, nil, nil)
//lint:file-ignore SA1019 Keep supporting deprecated http.CloseNotifier
if _, ok := actual.(http.CloseNotifier); !ok {
t.Errorf("Expected http.ResponseWriter to implement http.CloseNotifier")
}
if _, ok := actual.(http.Flusher); !ok {
t.Errorf("Expected the wrapper to implement http.Flusher")
}
if _, ok := actual.(http.Hijacker); ok {
t.Errorf("Expected http.ResponseWriter not to implement http.Hijacker")
}
actual = decorateResponseWriter(context.Background(), &responsewriter.FakeResponseWriterFlusherCloseNotifierHijacker{}, nil, nil, nil)
//lint:file-ignore SA1019 Keep supporting deprecated http.CloseNotifier
if _, ok := actual.(http.CloseNotifier); !ok {
t.Errorf("Expected http.ResponseWriter to implement http.CloseNotifier")
}
if _, ok := actual.(http.Flusher); !ok {
t.Errorf("Expected the wrapper to implement http.Flusher")
}
if _, ok := actual.(http.Hijacker); !ok {
t.Errorf("Expected http.ResponseWriter to implement http.Hijacker")
}
}
func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil)
actual := decorateResponseWriter(context.Background(), &responsewriter.FakeResponseWriter{}, ev, nil, nil)
// write status. This will not block because firstEventSentCh is nil
actual.WriteHeader(42)
@ -125,7 +127,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil)
actual := decorateResponseWriter(context.Background(), &responsewriter.FakeResponseWriter{}, ev, nil, nil)
// write status. This will not block because firstEventSentCh is nil
actual.Write([]byte("foo"))
@ -140,7 +142,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
func TestDecorateResponseWriterChannel(t *testing.T) {
sink := &fakeAuditSink{}
ev := &auditinternal.Event{}
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, sink, nil)
actual := decorateResponseWriter(context.Background(), &responsewriter.FakeResponseWriter{}, ev, sink, nil)
done := make(chan struct{})
go func() {

View File

@ -31,7 +31,6 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/wsstream"
"golang.org/x/net/websocket"
@ -166,8 +165,6 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
defer metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec()
w = httplog.Unlogged(req, w)
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", s.MediaType)
websocket.Handler(s.HandleWS).ServeHTTP(w, req)

View File

@ -17,9 +17,7 @@ limitations under the License.
package metrics
import (
"bufio"
"context"
"net"
"net/http"
"net/url"
"strconv"
@ -34,6 +32,7 @@ import (
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
compbasemetrics "k8s.io/component-base/metrics"
@ -486,16 +485,7 @@ func InstrumentRouteFunc(verb, group, version, resource, subresource, scope, com
delegate := &ResponseWriterDelegator{ResponseWriter: response.ResponseWriter}
//lint:file-ignore SA1019 Keep supporting deprecated http.CloseNotifier
_, cn := response.ResponseWriter.(http.CloseNotifier)
_, fl := response.ResponseWriter.(http.Flusher)
_, hj := response.ResponseWriter.(http.Hijacker)
var rw http.ResponseWriter
if cn && fl && hj {
rw = &fancyResponseWriterDelegator{delegate}
} else {
rw = delegate
}
rw := responsewriter.WrapForHTTP1Or2(delegate)
response.ResponseWriter = rw
routeFunc(req, response)
@ -513,15 +503,7 @@ func InstrumentHandlerFunc(verb, group, version, resource, subresource, scope, c
}
delegate := &ResponseWriterDelegator{ResponseWriter: w}
_, cn := w.(http.CloseNotifier)
_, fl := w.(http.Flusher)
_, hj := w.(http.Hijacker)
if cn && fl && hj {
w = &fancyResponseWriterDelegator{delegate}
} else {
w = delegate
}
w = responsewriter.WrapForHTTP1Or2(delegate)
handler(w, req)
@ -612,6 +594,9 @@ func cleanDryRun(u *url.URL) string {
return strings.Join(utilsets.NewString(dryRun...).List(), ",")
}
var _ http.ResponseWriter = (*ResponseWriterDelegator)(nil)
var _ responsewriter.UserProvidedDecorator = (*ResponseWriterDelegator)(nil)
// ResponseWriterDelegator interface wraps http.ResponseWriter to additionally record content-length, status-code, etc.
type ResponseWriterDelegator struct {
http.ResponseWriter
@ -621,6 +606,10 @@ type ResponseWriterDelegator struct {
wroteHeader bool
}
func (r *ResponseWriterDelegator) Unwrap() http.ResponseWriter {
return r.ResponseWriter
}
func (r *ResponseWriterDelegator) WriteHeader(code int) {
r.status = code
r.wroteHeader = true
@ -644,22 +633,6 @@ func (r *ResponseWriterDelegator) ContentLength() int {
return int(r.written)
}
type fancyResponseWriterDelegator struct {
*ResponseWriterDelegator
}
func (f *fancyResponseWriterDelegator) CloseNotify() <-chan bool {
return f.ResponseWriter.(http.CloseNotifier).CloseNotify()
}
func (f *fancyResponseWriterDelegator) Flush() {
f.ResponseWriter.(http.Flusher).Flush()
}
func (f *fancyResponseWriterDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return f.ResponseWriter.(http.Hijacker).Hijack()
}
// Small optimization over Itoa
func codeToString(s int) string {
switch s {

View File

@ -22,6 +22,7 @@ import (
"testing"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
)
func TestCleanVerb(t *testing.T) {
@ -181,3 +182,14 @@ func TestCleanScope(t *testing.T) {
})
}
}
func TestResponseWriterDecorator(t *testing.T) {
decorator := &ResponseWriterDelegator{
ResponseWriter: &responsewriter.FakeResponseWriter{},
}
var w http.ResponseWriter = decorator
if inner := w.(responsewriter.UserProvidedDecorator).Unwrap(); inner != decorator.ResponseWriter {
t.Errorf("Expected the decorator to return the inner http.ResponseWriter object")
}
}

View File

@ -0,0 +1,54 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package responsewriter
import (
"bufio"
"net"
"net/http"
)
var _ http.ResponseWriter = &FakeResponseWriter{}
// FakeResponseWriter implements http.ResponseWriter,
// it is used for testing purpose only
type FakeResponseWriter struct{}
func (fw *FakeResponseWriter) Header() http.Header { return http.Header{} }
func (fw *FakeResponseWriter) WriteHeader(code int) {}
func (fw *FakeResponseWriter) Write(bs []byte) (int, error) { return len(bs), nil }
// For HTTP2 an http.ResponseWriter object implements
// http.Flusher and http.CloseNotifier.
// It is used for testing purpose only
type FakeResponseWriterFlusherCloseNotifier struct {
*FakeResponseWriter
}
func (fw *FakeResponseWriterFlusherCloseNotifier) Flush() {}
func (fw *FakeResponseWriterFlusherCloseNotifier) CloseNotify() <-chan bool { return nil }
// For HTTP/1.x an http.ResponseWriter object implements
// http.Flusher, http.CloseNotifier and http.Hijacker.
// It is used for testing purpose only
type FakeResponseWriterFlusherCloseNotifierHijacker struct {
*FakeResponseWriterFlusherCloseNotifier
}
func (fw *FakeResponseWriterFlusherCloseNotifierHijacker) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return nil, nil, nil
}

View File

@ -0,0 +1,180 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package responsewriter
import (
"bufio"
"net"
"net/http"
)
// UserProvidedDecorator represensts a user (client that uses this package)
// provided decorator that wraps an inner http.ResponseWriter object.
// The user-provided decorator object must return the inner (decorated)
// http.ResponseWriter object via the Unwrap function.
type UserProvidedDecorator interface {
http.ResponseWriter
// Unwrap returns the inner http.ResponseWriter object associated
// with the user-provided decorator.
Unwrap() http.ResponseWriter
}
// WrapForHTTP1Or2 accepts a user-provided decorator of an "inner" http.responseWriter
// object and potentially wraps the user-provided decorator with a new http.ResponseWriter
// object that implements http.CloseNotifier, http.Flusher, and/or http.Hijacker by
// delegating to the user-provided decorator (if it implements the relevant method) or
// the inner http.ResponseWriter (otherwise), so that the returned http.ResponseWriter
// object implements the same subset of those interfaces as the inner http.ResponseWriter.
//
// This function handles the following three casses.
// - The inner ResponseWriter implements `http.CloseNotifier`, `http.Flusher`,
// and `http.Hijacker` (an HTTP/1.1 sever provides such a ResponseWriter).
// - The inner ResponseWriter implements `http.CloseNotifier` and `http.Flusher`
// but not `http.Hijacker` (an HTTP/2 server provides such a ResponseWriter).
// - All the other cases collapse to this one, in which the given ResponseWriter is returned.
//
// There are three applicable terms:
// - "outer": this is the ResponseWriter object returned by the WrapForHTTP1Or2 function.
// - "user-provided decorator" or "middle": this is the user-provided decorator
// that decorates an inner ResponseWriter object. A user-provided decorator
// implements the UserProvidedDecorator interface. A user-provided decorator
// may or may not implement http.CloseNotifier, http.Flusher or http.Hijacker.
// - "inner": the ResponseWriter that the user-provided decorator extends.
func WrapForHTTP1Or2(decorator UserProvidedDecorator) http.ResponseWriter {
// from go net/http documentation:
// The default HTTP/1.x and HTTP/2 ResponseWriter implementations support Flusher
// Handlers should always test for this ability at runtime.
//
// The Hijacker interface is implemented by ResponseWriters that allow an HTTP handler
// to take over the connection.
// The default ResponseWriter for HTTP/1.x connections supports Hijacker, but HTTP/2 connections
// intentionally do not. ResponseWriter wrappers may also not support Hijacker.
// Handlers should always test for this ability at runtime
//
// The CloseNotifier interface is implemented by ResponseWriters which allow detecting
// when the underlying connection has gone away.
// Deprecated: the CloseNotifier interface predates Go's context package.
// New code should use Request.Context instead.
inner := decorator.Unwrap()
if innerNotifierFlusher, ok := inner.(CloseNotifierFlusher); ok {
// for HTTP/2 request, the default ResponseWriter object (http2responseWriter)
// implements Flusher and CloseNotifier.
outerHTTP2 := outerWithCloseNotifyAndFlush{
UserProvidedDecorator: decorator,
InnerCloseNotifierFlusher: innerNotifierFlusher,
}
if innerHijacker, hijackable := inner.(http.Hijacker); hijackable {
// for HTTP/1.x request the default implementation of ResponseWriter
// also implement CloseNotifier, Flusher and Hijacker
return &outerWithCloseNotifyFlushAndHijack{
outerWithCloseNotifyAndFlush: outerHTTP2,
InnerHijacker: innerHijacker,
}
}
return outerHTTP2
}
// we should never be here for either http/1.x or http2 request
return decorator
}
// CloseNotifierFlusher is a combination of http.CloseNotifier and http.Flusher
// This applies to both http/1.x and http2 requests.
type CloseNotifierFlusher interface {
http.CloseNotifier
http.Flusher
}
// GetOriginal goes through the chain of wrapped http.ResponseWriter objects
// and returns the original http.ResponseWriter object provided to the first
// request handler in the filter chain.
func GetOriginal(w http.ResponseWriter) http.ResponseWriter {
decorator, ok := w.(UserProvidedDecorator)
if !ok {
return w
}
inner := decorator.Unwrap()
if inner == w {
// infinite cycle here, we should never be here though.
panic("http.ResponseWriter decorator chain has a cycle")
}
return GetOriginal(inner)
}
//lint:ignore SA1019 backward compatibility
var _ http.CloseNotifier = outerWithCloseNotifyAndFlush{}
var _ http.Flusher = outerWithCloseNotifyAndFlush{}
var _ http.ResponseWriter = outerWithCloseNotifyAndFlush{}
var _ UserProvidedDecorator = outerWithCloseNotifyAndFlush{}
// outerWithCloseNotifyAndFlush is the outer object that extends the
// user provied decorator with http.CloseNotifier and http.Flusher only.
type outerWithCloseNotifyAndFlush struct {
// UserProvidedDecorator is the user-provided object, it decorates
// an inner ResponseWriter object.
UserProvidedDecorator
// http.CloseNotifier and http.Flusher for the inner object
InnerCloseNotifierFlusher CloseNotifierFlusher
}
func (wr outerWithCloseNotifyAndFlush) CloseNotify() <-chan bool {
if notifier, ok := wr.UserProvidedDecorator.(http.CloseNotifier); ok {
return notifier.CloseNotify()
}
return wr.InnerCloseNotifierFlusher.CloseNotify()
}
func (wr outerWithCloseNotifyAndFlush) Flush() {
if flusher, ok := wr.UserProvidedDecorator.(http.Flusher); ok {
flusher.Flush()
return
}
wr.InnerCloseNotifierFlusher.Flush()
}
//lint:file-ignore SA1019 Keep supporting deprecated http.CloseNotifier
var _ http.CloseNotifier = outerWithCloseNotifyFlushAndHijack{}
var _ http.Flusher = outerWithCloseNotifyFlushAndHijack{}
var _ http.Hijacker = outerWithCloseNotifyFlushAndHijack{}
var _ http.ResponseWriter = outerWithCloseNotifyFlushAndHijack{}
var _ UserProvidedDecorator = outerWithCloseNotifyFlushAndHijack{}
// outerWithCloseNotifyFlushAndHijack is the outer object that extends the
// user-provided decorator with http.CloseNotifier, http.Flusher and http.Hijacker.
// This applies to http/1.x requests only.
type outerWithCloseNotifyFlushAndHijack struct {
outerWithCloseNotifyAndFlush
// http.Hijacker for the inner object
InnerHijacker http.Hijacker
}
func (wr outerWithCloseNotifyFlushAndHijack) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if hijacker, ok := wr.UserProvidedDecorator.(http.Hijacker); ok {
return hijacker.Hijack()
}
return wr.InnerHijacker.Hijack()
}

View File

@ -0,0 +1,298 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package responsewriter
import (
"bufio"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
)
func TestWithHTTP1(t *testing.T) {
var originalWant http.ResponseWriter
counterGot := &counter{}
chain := func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if originalWant == nil {
originalWant = w
}
assertCloseNotifierFlusherHijacker(t, true, w)
decorator := &fakeResponseWriterDecorator{
ResponseWriter: w,
counter: counterGot,
}
wrapped := WrapForHTTP1Or2(decorator)
assertCloseNotifierFlusherHijacker(t, true, wrapped)
originalGot := GetOriginal(wrapped)
if originalWant != originalGot {
t.Errorf("Expected GetOriginal to return the original ResponseWriter object")
return
}
h.ServeHTTP(wrapped, r)
})
}
// wrap the original http.ResponseWriter multiple times
handler := chain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// at this point, the original ResponseWriter object has been wrapped three times
// so each decorator is expected to tick the count by one for each method.
defer counterGot.assert(t, &counter{FlushInvoked: 3, CloseNotifyInvoked: 3, HijackInvoked: 3})
//lint:ignore SA1019 backward compatibility
w.(http.CloseNotifier).CloseNotify()
w.(http.Flusher).Flush()
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
t.Errorf("Expected Hijack to succeed, but got error: %v", err)
return
}
conn.Close()
}))
handler = chain(handler)
handler = chain(handler)
server := newServer(t, handler, false)
defer server.Close()
sendRequest(t, server)
}
func TestWithHTTP2(t *testing.T) {
var originalWant http.ResponseWriter
counterGot := &counter{}
chain := func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if originalWant == nil {
originalWant = w
}
assertCloseNotifierFlusherHijacker(t, false, w)
decorator := &fakeResponseWriterDecorator{
ResponseWriter: w,
counter: counterGot,
}
wrapped := WrapForHTTP1Or2(decorator)
assertCloseNotifierFlusherHijacker(t, false, wrapped)
originalGot := GetOriginal(wrapped)
if originalWant != originalGot {
t.Errorf("Expected GetOriginal to return the original ResponseWriter object")
return
}
h.ServeHTTP(wrapped, r)
})
}
// wrap the original http.ResponseWriter multiple times
handler := chain(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// at this point, the original ResponseWriter object has been wrapped three times
// so each decorator is expected to tick the count by one for each method.
defer counterGot.assert(t, &counter{FlushInvoked: 3, CloseNotifyInvoked: 3, HijackInvoked: 0})
//lint:ignore SA1019 backward compatibility
w.(http.CloseNotifier).CloseNotify()
w.(http.Flusher).Flush()
}))
handler = chain(handler)
handler = chain(handler)
server := newServer(t, handler, true)
defer server.Close()
sendRequest(t, server)
}
func TestGetOriginal(t *testing.T) {
tests := []struct {
name string
wrap func() (http.ResponseWriter, http.ResponseWriter)
panicExpected bool
}{
{
name: "not wrapped",
wrap: func() (http.ResponseWriter, http.ResponseWriter) {
original := &FakeResponseWriter{}
return original, original
},
},
{
name: "wrapped once",
wrap: func() (http.ResponseWriter, http.ResponseWriter) {
original := &FakeResponseWriter{}
return original, &fakeResponseWriterDecorator{
ResponseWriter: original,
}
},
},
{
name: "wrapped multiple times",
wrap: func() (http.ResponseWriter, http.ResponseWriter) {
original := &FakeResponseWriter{}
return original, &fakeResponseWriterDecorator{
ResponseWriter: &fakeResponseWriterDecorator{
ResponseWriter: &fakeResponseWriterDecorator{
ResponseWriter: original,
},
},
}
},
},
{
name: "wraps itself",
wrap: func() (http.ResponseWriter, http.ResponseWriter) {
faulty := &fakeResponseWriterDecorator{}
faulty.ResponseWriter = faulty
return faulty, &fakeResponseWriterDecorator{
ResponseWriter: faulty,
}
},
panicExpected: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
originalExpected, wrapped := test.wrap()
func() {
defer func() {
err := recover()
switch {
case err != nil:
if !test.panicExpected {
t.Errorf("Expected no panic, but got: %v", err)
}
default:
if test.panicExpected {
t.Errorf("Expected a panic")
}
}
}()
originalGot := GetOriginal(wrapped)
if originalExpected != originalGot {
t.Errorf("Expected to get tehe original http.ResponseWriter object")
}
}()
})
}
}
func newServer(t *testing.T, h http.Handler, http2 bool) *httptest.Server {
server := httptest.NewUnstartedServer(h)
if http2 {
server.EnableHTTP2 = true
server.StartTLS()
} else {
server.Start()
}
_, err := url.Parse(server.URL)
if err != nil {
t.Fatalf("Expected the server to have a valid URL, but got: %s", server.URL)
}
return server
}
func sendRequest(t *testing.T, server *httptest.Server) {
req, err := http.NewRequest("GET", server.URL, nil)
if err != nil {
t.Fatalf("error creating request: %v", err)
}
client := server.Client()
client.Timeout = 30 * time.Second
_, err = client.Do(req)
if err != nil {
t.Fatalf("Unexpected non-nil err from client.Do: %v", err)
}
}
func assertCloseNotifierFlusherHijacker(t *testing.T, hijackableExpected bool, w http.ResponseWriter) {
// the http.ResponseWriter object for both http/1.x and http2
// implement http.Flusher and http.CloseNotifier
if _, ok := w.(http.Flusher); !ok {
t.Errorf("Expected the http.ResponseWriter object to implement http.Flusher")
}
//lint:ignore SA1019 backward compatibility
if _, ok := w.(http.CloseNotifier); !ok {
t.Errorf("Expected the http.ResponseWriter object to implement http.CloseNotifier")
}
// http/1.x implements http.Hijacker, not http2
if _, ok := w.(http.Hijacker); ok != hijackableExpected {
t.Errorf("Unexpected http.Hijacker implementation, expected: %t, but got: %t", hijackableExpected, ok)
}
}
type counter struct {
FlushInvoked int
HijackInvoked int
CloseNotifyInvoked int
}
func (c *counter) assert(t *testing.T, expected *counter) {
if expected.FlushInvoked != c.FlushInvoked {
t.Errorf("Expected Flush() count to match, wanted: %d, but got: %d", expected.FlushInvoked, c.FlushInvoked)
}
if expected.CloseNotifyInvoked != c.CloseNotifyInvoked {
t.Errorf("Expected CloseNotify() count to match, wanted: %d, but got: %d", expected.CloseNotifyInvoked, c.CloseNotifyInvoked)
}
if expected.HijackInvoked != c.HijackInvoked {
t.Errorf("Expected Hijack() count to match, wanted: %d, but got: %d", expected.HijackInvoked, c.HijackInvoked)
}
}
type fakeResponseWriterDecorator struct {
http.ResponseWriter
counter *counter
}
func (fw *fakeResponseWriterDecorator) Unwrap() http.ResponseWriter { return fw.ResponseWriter }
func (fw *fakeResponseWriterDecorator) Flush() {
if fw.counter != nil {
fw.counter.FlushInvoked++
}
fw.ResponseWriter.(http.Flusher).Flush()
}
func (fw *fakeResponseWriterDecorator) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if fw.counter != nil {
fw.counter.HijackInvoked++
}
return fw.ResponseWriter.(http.Hijacker).Hijack()
}
func (fw *fakeResponseWriterDecorator) CloseNotify() <-chan bool {
if fw.counter != nil {
fw.counter.CloseNotifyInvoked++
}
//lint:ignore SA1019 backward compatibility
return fw.ResponseWriter.(http.CloseNotifier).CloseNotify()
}

View File

@ -30,6 +30,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
)
// WithTimeoutForNonLongRunningRequests times out non-long-running requests after the time given by timeout.
@ -90,7 +91,8 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// resultCh is used as both errCh and stopCh
resultCh := make(chan interface{})
tw := newTimeoutWriter(w)
var tw timeoutWriter
tw, w = newTimeoutWriter(w)
go func() {
defer func() {
err := recover()
@ -105,7 +107,7 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
resultCh <- err
}()
t.handler.ServeHTTP(tw, r)
t.handler.ServeHTTP(w, r)
}()
select {
case err := <-resultCh:
@ -146,24 +148,16 @@ type timeoutWriter interface {
timeout(*apierrors.StatusError)
}
func newTimeoutWriter(w http.ResponseWriter) timeoutWriter {
func newTimeoutWriter(w http.ResponseWriter) (timeoutWriter, http.ResponseWriter) {
base := &baseTimeoutWriter{w: w}
wrapped := responsewriter.WrapForHTTP1Or2(base)
_, notifiable := w.(http.CloseNotifier)
_, hijackable := w.(http.Hijacker)
switch {
case notifiable && hijackable:
return &closeHijackTimeoutWriter{base}
case notifiable:
return &closeTimeoutWriter{base}
case hijackable:
return &hijackTimeoutWriter{base}
default:
return base
}
return base, wrapped
}
var _ http.ResponseWriter = &baseTimeoutWriter{}
var _ responsewriter.UserProvidedDecorator = &baseTimeoutWriter{}
type baseTimeoutWriter struct {
w http.ResponseWriter
@ -176,6 +170,10 @@ type baseTimeoutWriter struct {
hijacked bool
}
func (tw *baseTimeoutWriter) Unwrap() http.ResponseWriter {
return tw.w
}
func (tw *baseTimeoutWriter) Header() http.Header {
tw.mu.Lock()
defer tw.mu.Unlock()
@ -210,9 +208,9 @@ func (tw *baseTimeoutWriter) Flush() {
return
}
if flusher, ok := tw.w.(http.Flusher); ok {
flusher.Flush()
}
// the outer ResponseWriter object returned by WrapForHTTP1Or2 implements
// http.Flusher if the inner object (tw.w) implements http.Flusher.
tw.w.(http.Flusher).Flush()
}
func (tw *baseTimeoutWriter) WriteHeader(code int) {
@ -259,7 +257,7 @@ func (tw *baseTimeoutWriter) timeout(err *apierrors.StatusError) {
}
}
func (tw *baseTimeoutWriter) closeNotify() <-chan bool {
func (tw *baseTimeoutWriter) CloseNotify() <-chan bool {
tw.mu.Lock()
defer tw.mu.Unlock()
@ -269,47 +267,24 @@ func (tw *baseTimeoutWriter) closeNotify() <-chan bool {
return done
}
// the outer ResponseWriter object returned by WrapForHTTP1Or2 implements
// http.CloseNotifier if the inner object (tw.w) implements http.CloseNotifier.
return tw.w.(http.CloseNotifier).CloseNotify()
}
func (tw *baseTimeoutWriter) hijack() (net.Conn, *bufio.ReadWriter, error) {
func (tw *baseTimeoutWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
tw.mu.Lock()
defer tw.mu.Unlock()
if tw.timedOut {
return nil, nil, http.ErrHandlerTimeout
}
// the outer ResponseWriter object returned by WrapForHTTP1Or2 implements
// http.Hijacker if the inner object (tw.w) implements http.Hijacker.
conn, rw, err := tw.w.(http.Hijacker).Hijack()
if err == nil {
tw.hijacked = true
}
return conn, rw, err
}
type closeTimeoutWriter struct {
*baseTimeoutWriter
}
func (tw *closeTimeoutWriter) CloseNotify() <-chan bool {
return tw.closeNotify()
}
type hijackTimeoutWriter struct {
*baseTimeoutWriter
}
func (tw *hijackTimeoutWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return tw.hijack()
}
type closeHijackTimeoutWriter struct {
*baseTimeoutWriter
}
func (tw *closeHijackTimeoutWriter) CloseNotify() <-chan bool {
return tw.closeNotify()
}
func (tw *closeHijackTimeoutWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return tw.hijack()
}

View File

@ -45,6 +45,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/klog/v2"
)
@ -368,6 +369,17 @@ func TestErrConnKilledHTTP2(t *testing.T) {
}
}
func TestResponseWriterDecorator(t *testing.T) {
decorator := &baseTimeoutWriter{
w: &responsewriter.FakeResponseWriter{},
}
var w http.ResponseWriter = decorator
if inner := w.(responsewriter.UserProvidedDecorator).Unwrap(); inner != decorator.w {
t.Errorf("Expected the decorator to return the inner http.ResponseWriter object")
}
}
func isStackTraceLoggedByRuntime(message string) bool {
// Check the captured output for the following patterns to find out if the
// stack trace is included in the log:

View File

@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/klog/v2"
)
@ -255,7 +255,7 @@ func handleRootHealth(name string, firstTimeHealthy func(), checks ...HealthChec
// always be verbose on failure
if len(failedChecks) > 0 {
klog.V(2).Infof("%s check failed: %s\n%v", strings.Join(failedChecks, ","), name, failedVerboseLogOutput.String())
http.Error(httplog.Unlogged(r, w), fmt.Sprintf("%s%s check failed", individualCheckOutput.String(), name), http.StatusInternalServerError)
http.Error(responsewriter.GetOriginal(w), fmt.Sprintf("%s%s check failed", individualCheckOutput.String(), name), http.StatusInternalServerError)
return
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/klog/v2"
)
@ -66,12 +67,16 @@ type respLogger struct {
logStacktracePred StacktracePred
}
var _ http.ResponseWriter = &respLogger{}
var _ responsewriter.UserProvidedDecorator = &respLogger{}
func (rl *respLogger) Unwrap() http.ResponseWriter {
return rl.w
}
// Simple logger that logs immediately when Addf is called
type passthroughLogger struct{}
//lint:ignore SA1019 Interface implementation check to make sure we don't drop CloseNotifier again
var _ http.CloseNotifier = &respLogger{}
// Addf logs info immediately.
func (passthroughLogger) Addf(format string, data ...interface{}) {
klog.V(2).Info(fmt.Sprintf(format, data...))
@ -95,13 +100,15 @@ func WithLogging(handler http.Handler, pred StacktracePred) http.Handler {
startTime = receivedTimestamp
}
rl := newLoggedWithStartTime(req, w, startTime).StacktraceWhen(pred)
var rl *respLogger
rl, w = newLoggedWithStartTime(req, w, startTime)
rl.StacktraceWhen(pred)
req = req.WithContext(context.WithValue(ctx, respLoggerContextKey, rl))
if klog.V(3).Enabled() {
defer rl.Log()
}
handler.ServeHTTP(rl, req)
handler.ServeHTTP(w, req)
})
}
@ -118,17 +125,20 @@ func respLoggerFromRequest(req *http.Request) *respLogger {
return respLoggerFromContext(req.Context())
}
func newLoggedWithStartTime(req *http.Request, w http.ResponseWriter, startTime time.Time) *respLogger {
return &respLogger{
func newLoggedWithStartTime(req *http.Request, w http.ResponseWriter, startTime time.Time) (*respLogger, http.ResponseWriter) {
logger := &respLogger{
startTime: startTime,
req: req,
w: w,
logStacktracePred: DefaultStacktracePred,
}
rw := responsewriter.WrapForHTTP1Or2(logger)
return logger, rw
}
// newLogged turns a normal response writer into a logged response writer.
func newLogged(req *http.Request, w http.ResponseWriter) *respLogger {
func newLogged(req *http.Request, w http.ResponseWriter) (*respLogger, http.ResponseWriter) {
return newLoggedWithStartTime(req, w, time.Now())
}
@ -253,32 +263,18 @@ func (rl *respLogger) Write(b []byte) (int, error) {
return rl.w.Write(b)
}
// Flush implements http.Flusher even if the underlying http.Writer doesn't implement it.
// Flush is used for streaming purposes and allows to flush buffered data to the client.
func (rl *respLogger) Flush() {
if flusher, ok := rl.w.(http.Flusher); ok {
flusher.Flush()
} else if klog.V(2).Enabled() {
klog.InfoDepth(1, fmt.Sprintf("Unable to convert %+v into http.Flusher", rl.w))
}
}
// WriteHeader implements http.ResponseWriter.
func (rl *respLogger) WriteHeader(status int) {
rl.recordStatus(status)
rl.w.WriteHeader(status)
}
// Hijack implements http.Hijacker.
func (rl *respLogger) Hijack() (net.Conn, *bufio.ReadWriter, error) {
rl.hijacked = true
return rl.w.(http.Hijacker).Hijack()
}
// CloseNotify implements http.CloseNotifier
func (rl *respLogger) CloseNotify() <-chan bool {
//lint:ignore SA1019 There are places in the code base requiring the CloseNotifier interface to be implemented.
return rl.w.(http.CloseNotifier).CloseNotify()
// the outer ResponseWriter object returned by WrapForHTTP1Or2 implements
// http.Hijacker if the inner object (rl.w) implements http.Hijacker.
return rl.w.(http.Hijacker).Hijack()
}
func (rl *respLogger) recordStatus(status int) {

View File

@ -21,6 +21,8 @@ import (
"net/http/httptest"
"reflect"
"testing"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
)
func TestDefaultStacktracePred(t *testing.T) {
@ -127,28 +129,22 @@ func TestUnlogged(t *testing.T) {
}
}
type testResponseWriter struct{}
func (*testResponseWriter) Header() http.Header { return nil }
func (*testResponseWriter) Write([]byte) (int, error) { return 0, nil }
func (*testResponseWriter) WriteHeader(int) {}
func TestLoggedStatus(t *testing.T) {
req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
var tw http.ResponseWriter = new(testResponseWriter)
logger := newLogged(req, tw)
var tw http.ResponseWriter = new(responsewriter.FakeResponseWriter)
logger, _ := newLogged(req, tw)
logger.Write(nil)
if logger.status != http.StatusOK {
t.Errorf("expected status after write to be %v, got %v", http.StatusOK, logger.status)
}
tw = new(testResponseWriter)
logger = newLogged(req, tw)
tw = new(responsewriter.FakeResponseWriter)
logger, _ = newLogged(req, tw)
logger.WriteHeader(http.StatusForbidden)
logger.Write(nil)
@ -156,3 +152,58 @@ func TestLoggedStatus(t *testing.T) {
t.Errorf("expected status after write to remain %v, got %v", http.StatusForbidden, logger.status)
}
}
func TestRespLoggerWithDecoratedResponseWriter(t *testing.T) {
req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
var tw http.ResponseWriter = new(responsewriter.FakeResponseWriter)
_, rwGot := newLogged(req, tw)
switch v := rwGot.(type) {
case *respLogger:
default:
t.Errorf("Expected respLogger, got %v", reflect.TypeOf(v))
}
tw = new(responsewriter.FakeResponseWriterFlusherCloseNotifier)
_, rwGot = newLogged(req, tw)
//lint:file-ignore SA1019 Keep supporting deprecated http.CloseNotifier
if _, ok := rwGot.(http.CloseNotifier); !ok {
t.Errorf("Expected http.ResponseWriter to implement http.CloseNotifier")
}
if _, ok := rwGot.(http.Flusher); !ok {
t.Errorf("Expected the wrapper to implement http.Flusher")
}
if _, ok := rwGot.(http.Hijacker); ok {
t.Errorf("Expected http.ResponseWriter not to implement http.Hijacker")
}
tw = new(responsewriter.FakeResponseWriterFlusherCloseNotifierHijacker)
_, rwGot = newLogged(req, tw)
//lint:file-ignore SA1019 Keep supporting deprecated http.CloseNotifier
if _, ok := rwGot.(http.CloseNotifier); !ok {
t.Errorf("Expected http.ResponseWriter to implement http.CloseNotifier")
}
if _, ok := rwGot.(http.Flusher); !ok {
t.Errorf("Expected the wrapper to implement http.Flusher")
}
if _, ok := rwGot.(http.Hijacker); !ok {
t.Errorf("Expected http.ResponseWriter to implement http.Hijacker")
}
}
func TestResponseWriterDecorator(t *testing.T) {
decorator := &respLogger{
w: &responsewriter.FakeResponseWriter{},
}
var w http.ResponseWriter = decorator
if inner := w.(responsewriter.UserProvidedDecorator).Unwrap(); inner != decorator.w {
t.Errorf("Expected the decorator to return the inner http.ResponseWriter object")
}
}

1
vendor/modules.txt vendored
View File

@ -1508,6 +1508,7 @@ k8s.io/apiserver/pkg/endpoints/handlers/responsewriters
k8s.io/apiserver/pkg/endpoints/metrics
k8s.io/apiserver/pkg/endpoints/openapi
k8s.io/apiserver/pkg/endpoints/request
k8s.io/apiserver/pkg/endpoints/responsewriter
k8s.io/apiserver/pkg/endpoints/warning
k8s.io/apiserver/pkg/features
k8s.io/apiserver/pkg/quota/v1