Merge pull request #95002 from p0lyn0mial/upstream-supress-err-conn-killed

stop logging killing connection/stream because serving request timed out and response had been started
This commit is contained in:
Kubernetes Prow Robot 2020-11-10 12:37:51 -08:00 committed by GitHub
commit 40ef0ad6e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 285 additions and 23 deletions

View File

@ -221,7 +221,7 @@ func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
return handler
}

View File

@ -26,6 +26,7 @@ import (
// BuildInsecureHandlerChain sets up the server to listen to http. Should be removed.
func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.Handler {
requestInfoResolver := server.NewRequestInfoResolver(c)
handler := apiHandler
// Temporarily disable APIPriorityAndFairness during development
// so that /debug/pprof works even while this feature is totally
@ -40,10 +41,10 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, server.NewRequestInfoResolver(c))
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
return handler
}

View File

@ -206,6 +206,16 @@ var (
[]string{"filter"},
)
// requestAbortsTotal is a number of aborted requests with http.ErrAbortHandler
requestAbortsTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_request_aborts_total",
Help: "Number of requests which apiserver aborted possibly due to a timeout, for each group, version, verb, resource, subresource and scope",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"verb", "group", "version", "resource", "subresource", "scope"},
)
kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
metrics = []resettableCollector{
@ -224,6 +234,7 @@ var (
requestTerminationsTotal,
apiSelfRequestCounter,
requestFilterDuration,
requestAbortsTotal,
}
// these are the known (e.g. whitelisted/known) content types which we will report for
@ -317,6 +328,22 @@ func RecordFilterLatency(name string, elapsed time.Duration) {
requestFilterDuration.WithLabelValues(name).Observe(elapsed.Seconds())
}
// RecordRequestAbort records that the request was aborted possibly due to a timeout.
func RecordRequestAbort(req *http.Request, requestInfo *request.RequestInfo) {
if requestInfo == nil {
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
}
scope := CleanScope(requestInfo)
reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req)
resource := requestInfo.Resource
subresource := requestInfo.Subresource
group := requestInfo.APIGroup
version := requestInfo.APIVersion
requestAbortsTotal.WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc()
}
// RecordRequestTermination records that the request was terminated early as part of a resource
// preservation or apiserver self-defense mechanism (e.g. timeouts, maxinflight throttling,
// proxyHandler errors). RecordRequestTermination should only be called zero or one times

View File

@ -756,7 +756,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericfilters.WithPanicRecovery(handler)
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
return handler
}

View File

@ -36,6 +36,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//vendor/golang.org/x/net/http2:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -33,8 +33,6 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
var errConnKilled = fmt.Errorf("killing connection/stream because serving request timed out and response had been started")
// WithTimeoutForNonLongRunningRequests times out non-long-running requests after the time given by timeout.
func WithTimeoutForNonLongRunningRequests(handler http.Handler, longRunning apirequest.LongRunningRequestCheck, timeout time.Duration) http.Handler {
if longRunning == nil {
@ -246,15 +244,17 @@ func (tw *baseTimeoutWriter) timeout(err *apierrors.StatusError) {
// no way to timeout the HTTP request at the point. We have to shutdown
// the connection for HTTP1 or reset stream for HTTP2.
//
// Note from: Brad Fitzpatrick
// if the ServeHTTP goroutine panics, that will do the best possible thing for both
// HTTP/1 and HTTP/2. In HTTP/1, assuming you're replying with at least HTTP/1.1 and
// you've already flushed the headers so it's using HTTP chunking, it'll kill the TCP
// connection immediately without a proper 0-byte EOF chunk, so the peer will recognize
// the response as bogus. In HTTP/2 the server will just RST_STREAM the stream, leaving
// the TCP connection open, but resetting the stream to the peer so it'll have an error,
// like the HTTP/1 case.
panic(errConnKilled)
// Note from the golang's docs:
// If ServeHTTP panics, the server (the caller of ServeHTTP) assumes
// that the effect of the panic was isolated to the active request.
// It recovers the panic, logs a stack trace to the server error log,
// and either closes the network connection or sends an HTTP/2
// RST_STREAM, depending on the HTTP protocol. To abort a handler so
// the client sees an interrupted response but the server doesn't log
// an error, panic with the value ErrAbortHandler.
//
// We are throwing http.ErrAbortHandler deliberately so that a client is notified and to suppress a not helpful stacktrace in the logs
panic(http.ErrAbortHandler)
}
}

View File

@ -17,22 +17,34 @@ limitations under the License.
package filters
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"net/http/httptrace"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
"golang.org/x/net/http2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
)
type recorder struct {
@ -175,3 +187,209 @@ func TestTimeout(t *testing.T) {
t.Fatalf("expected to see a handler panic, but didn't")
}
}
func captureStdErr() (func() string, func(), error) {
var buf bytes.Buffer
reader, writer, err := os.Pipe()
if err != nil {
return nil, nil, err
}
stderr := os.Stderr
readerClosedCh := make(chan struct{})
stopReadingStdErr := func() string {
writer.Close()
<-readerClosedCh
return buf.String()
}
klog.LogToStderr(true)
cleanUp := func() {
os.Stderr = stderr
klog.LogToStderr(false)
stopReadingStdErr()
}
os.Stderr = writer
go func() {
io.Copy(&buf, reader)
readerClosedCh <- struct{}{}
close(readerClosedCh)
}()
klog.LogToStderr(true)
return stopReadingStdErr, cleanUp, nil
}
func TestErrConnKilled(t *testing.T) {
readStdErr, cleanUp, err := captureStdErr()
if err != nil {
t.Fatalf("unable to setup the test, err %v", err)
}
defer cleanUp()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// this error must be ignored by the WithPanicRecovery handler
// it is thrown by WithTimeoutForNonLongRunningRequests handler when a response has been already sent to the client and the handler timed out
// panicking with http.ErrAbortHandler also suppresses logging of a stack trace to the server's error log and closes the underlying connection
w.Write([]byte("hello from the handler"))
panic(http.ErrAbortHandler)
})
resolver := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
ts := httptest.NewServer(WithPanicRecovery(handler, resolver))
defer ts.Close()
_, err = http.Get(ts.URL)
if err == nil {
t.Fatal("expected to receive an error")
}
capturedOutput := readStdErr()
if len(capturedOutput) > 0 {
t.Errorf("unexpected output captured actual = %v", capturedOutput)
}
}
type panicOnNonReuseTransport struct {
Transport http.RoundTripper
gotConnSeen bool
}
func (t *panicOnNonReuseTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return t.Transport.RoundTrip(req)
}
func (t *panicOnNonReuseTransport) GotConn(info httptrace.GotConnInfo) {
if !t.gotConnSeen {
t.gotConnSeen = true
return
}
if !info.Reused {
panic(fmt.Sprintf("expected the connection to be reused, info %#v", info))
}
}
// TestErrConnKilledHTTP2 check if HTTP/2 connection is not closed when an HTTP handler panics
// The net/http library recovers the panic and sends an HTTP/2 RST_STREAM.
func TestErrConnKilledHTTP2(t *testing.T) {
readStdErr, cleanUp, err := captureStdErr()
if err != nil {
t.Fatalf("unable to setup the test, err %v", err)
}
defer cleanUp()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// this error must be ignored by the WithPanicRecovery handler
// it is thrown by WithTimeoutForNonLongRunningRequests handler when a response has been already sent to the client and the handler timed out
// panicking with http.ErrAbortHandler also suppresses logging of a stack trace to the server's error log and closes the underlying connection
w.Write([]byte("hello from the handler"))
panic(http.ErrAbortHandler)
})
resolver := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
// test server
ts := httptest.NewUnstartedServer(WithPanicRecovery(handler, resolver))
tsCert, err := tls.X509KeyPair(tsCrt, tsKey)
if err != nil {
t.Fatalf("backend: invalid x509/key pair: %v", err)
}
ts.TLS = &tls.Config{
Certificates: []tls.Certificate{tsCert},
NextProtos: []string{http2.NextProtoTLS},
}
ts.StartTLS()
defer ts.Close()
newServerRequest := func(tr *panicOnNonReuseTransport) *http.Request {
req, _ := http.NewRequest("GET", fmt.Sprintf("https://127.0.0.1:%d", ts.Listener.Addr().(*net.TCPAddr).Port), nil)
trace := &httptrace.ClientTrace{
GotConn: tr.GotConn,
}
return req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
}
// client
clientCACertPool := x509.NewCertPool()
clientCACertPool.AppendCertsFromPEM(tsCrt)
clientTLSConfig := &tls.Config{
RootCAs: clientCACertPool,
NextProtos: []string{http2.NextProtoTLS},
}
tr := &panicOnNonReuseTransport{}
client := &http.Client{}
tr.Transport = &http2.Transport{
TLSClientConfig: clientTLSConfig,
}
client.Transport = tr
// act
_, err = client.Do(newServerRequest(tr))
if err == nil {
t.Fatal("expected to receive an error")
}
capturedOutput := readStdErr()
if len(capturedOutput) > 0 {
t.Errorf("unexpected output captured actual = %v", capturedOutput)
}
// make another req to the server
// the connection should be reused
// the client uses a custom transport that checks and panics when the con wasn't reused.
_, err = client.Do(newServerRequest(tr))
if err == nil {
t.Fatal("expected to receive an error")
}
}
var tsCrt = []byte(`-----BEGIN CERTIFICATE-----
MIIDTjCCAjagAwIBAgIJAJdcQEBN2CjoMA0GCSqGSIb3DQEBCwUAMFAxCzAJBgNV
BAYTAlBMMQ8wDQYDVQQIDAZQb2xhbmQxDzANBgNVBAcMBkdkYW5zazELMAkGA1UE
CgwCU0sxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0yMDA5MjgxMTU1MjhaFw0zMDA5
MjYxMTU1MjhaMFAxCzAJBgNVBAYTAlBMMQ8wDQYDVQQIDAZQb2xhbmQxDzANBgNV
BAcMBkdkYW5zazELMAkGA1UECgwCU0sxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIw
DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMr6b/uTHkIDEd88x3t3jnroOVwh
jWMwZ6qXN2NV/If1L9FNvtoZzZi6yCDE1uLdD1kWZ0R2XOPEwUPn+Z8A/lg9kF8J
GloLCF8q+XeYp8aWRKzwtdi+MPaKFf0wsuxEEHU4pypFrszNY0yLRbWAbMtgBFy0
KhyNGahFO9V69cRHUj6EJ9kSBg0nG5bsypon2rinzKpUrzAEl2MbM3F34Zit5yOv
rYQcbDME+9XmOJPD97XBvMZCbmPnmpst3tX7ZhdKgSKtIjoYt+d//wtPMXOhrRzM
xcc6HuIHAovtB4kvZl5wvVU8ra8DKZviYyjfW36kQHo+yFwP3XXZFWezZi0CAwEA
AaMrMCkwCQYDVR0TBAIwADALBgNVHQ8EBAMCBaAwDwYDVR0RBAgwBocEfwAAATAN
BgkqhkiG9w0BAQsFAAOCAQEAMoAlhZ6yRwPmFL2ql9ZYWqaPu2NC4dXYV6kUIXUA
pG3IqFWb3L4ePkgYBMDlJGpAJcXlSSgEGiLj+3qQojawIMzzxmqr2KX888S5Mr+9
I1qseRwcftwYqV/ewJSWE90HJ21pb1ixA6bSRJLV7DyxO6zKsdVJ4xIvehZtGbux
0RTf+8zUx8z2Goy1GUztOIqfMRt1P1hlQG0uvYsGQM84HO4+YhFwejrGaj8ajpgF
uo3B8BVHeh57FNGE6C45NkFGHq3tkNLMdAa32Az8DDvPmsJuycf6vgIfBEQxLZSF
OUKrKmtfdFv4XrInqFUYBYp5GkL8SGM2wmv6aSw9Aju4lA==
-----END CERTIFICATE-----`)
var tsKey = []byte(`-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDK+m/7kx5CAxHf
PMd7d4566DlcIY1jMGeqlzdjVfyH9S/RTb7aGc2YusggxNbi3Q9ZFmdEdlzjxMFD
5/mfAP5YPZBfCRpaCwhfKvl3mKfGlkSs8LXYvjD2ihX9MLLsRBB1OKcqRa7MzWNM
i0W1gGzLYARctCocjRmoRTvVevXER1I+hCfZEgYNJxuW7MqaJ9q4p8yqVK8wBJdj
GzNxd+GYrecjr62EHGwzBPvV5jiTw/e1wbzGQm5j55qbLd7V+2YXSoEirSI6GLfn
f/8LTzFzoa0czMXHOh7iBwKL7QeJL2ZecL1VPK2vAymb4mMo31t+pEB6PshcD911
2RVns2YtAgMBAAECggEAA2Qx0MtBeyrf9pHmZ1q1B7qvkqmA2kJpyQDjzQYXxRHE
rcOVx8EcnUupolqHmJzG798e9JbhsHCOJhtPIWf71++XZO8bAJwklKp8JpJnYzsJ
hLY0450x5jyiZ2uT4by1Za//owYtCID6AsJk9MZjivZcvEvKVFXLMvONL2DxkEj1
KaGQJh6/GT4jtNX07bW9+5w069KAAf+BNuqv8+Y/FseV3ovlpLTKjMV9xCCp9i62
PJs/hs5CW2X+JCE7OCLsAiu0JTpXYyHcLwYwnCONdvj6veiMWjRyNDr3ew5NeZNf
nGU4WX7mXjPd/1OvzJy6iyrBlAA63ZfFZYjWQnfsIQKBgQDmo3AMIr+9rE79NnaD
woQyO539YSO45KSM39/Xrp/NJVpOxtzgZrYo7O6f6kQ3S5zQOddy9Oj7gN3RXhZ7
Vi+Oja78ig7KUrqxcBiBGRsKZGm5CGdZ0EFd3rIEh4Qb+f+2c4f+6NWANb4kwvfq
K24c1o71+77lEVlzE2/L33K+mQKBgQDhTFr/f2e9gnRNX9bjF4p7DQI0RsFADjx0
jgJYHfm/lCIdH9vf6SmmvJv2E76Bqx9XVilhav/egqKO/wzJWHyNo2RFBXNqfwoF
UxRZKgqhcU52y2LKAYoTYfodktatZk74rinMDLmA6arnlAWQELk3Mx48DlND43Zc
DUHTKcJEtQKBgQDYdL1c9mPjnEqJxMqXwEAXcPJG8hr3lMaGXDoVjxL1EsBdvK9h
f6QoZq1RsiiRiMpEdnSotAfQutHzhA0vdeSuMnTvGJbm9Zu3mc+1oZ1KNJEwkh2F
Ijmm4rFKJPEs3IVMc8NHzrdJW6b3k2/e+yGduRR08e7nx0+e+7fpq+1hyQKBgHY9
l4h9+hkYjSdKhEG8yh3Ybu62r5eJoSremNZcLQXhnaHBZaj2+rgaRpP4OsRc5d71
RlRtTood72iy7KgDO6MuPGKJANDEiaLPvl8pVFj0WWS5S0iPVELl6dl5hheNGSck
aKVBjF3exKYzJlQ8oqgYuOZ18jcv+p9HCePkB6P9AoGBAJSYpkNDc/lnCpfIlxVw
n+VroX6QDIMZzC7BGiUSrmVsu6xEbI+8/C7ecN2oCZZLMj96EXe6j+np4zmkQezc
c1EwB7fNAiS0fWyE2RU6QAOZJ71bDpzQa4q4DxbOkYSybGPM/nqDRwovdjUnWeuM
+vrJUjAZAPHJcvos0iylnc8E
-----END PRIVATE KEY-----`)

View File

@ -17,22 +17,37 @@ limitations under the License.
package filters
import (
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
"net/http"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/server/httplog"
)
// WithPanicRecovery wraps an http Handler to recover and log panics (except in the special case of http.ErrAbortHandler panics, which suppress logging).
func WithPanicRecovery(handler http.Handler) http.Handler {
func WithPanicRecovery(handler http.Handler, resolver request.RequestInfoResolver) http.Handler {
return withPanicRecovery(handler, func(w http.ResponseWriter, req *http.Request, err interface{}) {
if err == http.ErrAbortHandler {
// honor the http.ErrAbortHandler sentinel panic value:
// ErrAbortHandler is a sentinel panic value to abort a handler.
// While any panic from ServeHTTP aborts the response to the client,
// panicking with ErrAbortHandler also suppresses logging of a stack trace to the server's error log.
// Honor the http.ErrAbortHandler sentinel panic value
//
// If ServeHTTP panics, the server (the caller of ServeHTTP) assumes
// that the effect of the panic was isolated to the active request.
// It recovers the panic, logs a stack trace to the server error log,
// and either closes the network connection or sends an HTTP/2
// RST_STREAM, depending on the HTTP protocol. To abort a handler so
// the client sees an interrupted response but the server doesn't log
// an error, panic with the value ErrAbortHandler.
//
// Note that the ReallyCrash variable controls the behaviour of the HandleCrash function
// So it might actually crash, after calling the handlers
info, err := resolver.NewRequestInfo(req)
if err != nil {
metrics.RecordRequestAbort(req, nil)
return
}
metrics.RecordRequestAbort(req, info)
return
}
http.Error(w, "This request caused apiserver to panic. Look in the logs for details.", http.StatusInternalServerError)

View File

@ -48,7 +48,7 @@ func BuildHandlerChain(apiHandler http.Handler, authorizationInfo *apiserver.Aut
}
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
return handler
}